The Meson Build System
http://mesonbuild.com/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
435 lines
14 KiB
435 lines
14 KiB
# Copyright 2013-2014 The Meson development team |
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
|
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
import os |
|
import io |
|
import sys |
|
import time |
|
import platform |
|
import shlex |
|
import subprocess |
|
import typing as T |
|
from contextlib import contextmanager |
|
from pathlib import Path |
|
|
|
if T.TYPE_CHECKING: |
|
from ._typing import StringProtocol, SizedStringProtocol |
|
|
|
"""This is (mostly) a standalone module used to write logging |
|
information about Meson runs. Some output goes to screen, |
|
some to logging dir and some goes to both.""" |
|
|
|
def _windows_ansi() -> bool: |
|
# windll only exists on windows, so mypy will get mad |
|
from ctypes import windll, byref # type: ignore |
|
from ctypes.wintypes import DWORD |
|
|
|
kernel = windll.kernel32 |
|
stdout = kernel.GetStdHandle(-11) |
|
mode = DWORD() |
|
if not kernel.GetConsoleMode(stdout, byref(mode)): |
|
return False |
|
# ENABLE_VIRTUAL_TERMINAL_PROCESSING == 0x4 |
|
# If the call to enable VT processing fails (returns 0), we fallback to |
|
# original behavior |
|
return bool(kernel.SetConsoleMode(stdout, mode.value | 0x4) or os.environ.get('ANSICON')) |
|
|
|
def colorize_console() -> bool: |
|
_colorize_console = getattr(sys.stdout, 'colorize_console', None) # type: bool |
|
if _colorize_console is not None: |
|
return _colorize_console |
|
|
|
try: |
|
if platform.system().lower() == 'windows': |
|
_colorize_console = os.isatty(sys.stdout.fileno()) and _windows_ansi() |
|
else: |
|
_colorize_console = os.isatty(sys.stdout.fileno()) and os.environ.get('TERM', 'dumb') != 'dumb' |
|
except Exception: |
|
_colorize_console = False |
|
|
|
sys.stdout.colorize_console = _colorize_console # type: ignore[attr-defined] |
|
return _colorize_console |
|
|
|
def setup_console() -> None: |
|
# on Windows, a subprocess might call SetConsoleMode() on the console |
|
# connected to stdout and turn off ANSI escape processing. Call this after |
|
# running a subprocess to ensure we turn it on again. |
|
if platform.system().lower() == 'windows': |
|
try: |
|
delattr(sys.stdout, 'colorize_console') |
|
except AttributeError: |
|
pass |
|
|
|
log_dir = None # type: T.Optional[str] |
|
log_file = None # type: T.Optional[T.TextIO] |
|
log_fname = 'meson-log.txt' # type: str |
|
log_depth = [] # type: T.List[str] |
|
log_timestamp_start = None # type: T.Optional[float] |
|
log_fatal_warnings = False # type: bool |
|
log_disable_stdout = False # type: bool |
|
log_errors_only = False # type: bool |
|
_in_ci = 'CI' in os.environ # type: bool |
|
_logged_once = set() # type: T.Set[T.Tuple[str, ...]] |
|
log_warnings_counter = 0 # type: int |
|
log_pager: T.Optional['subprocess.Popen'] = None |
|
|
|
def disable() -> None: |
|
global log_disable_stdout |
|
log_disable_stdout = True |
|
|
|
def enable() -> None: |
|
global log_disable_stdout |
|
log_disable_stdout = False |
|
|
|
def set_quiet() -> None: |
|
global log_errors_only |
|
log_errors_only = True |
|
|
|
def set_verbose() -> None: |
|
global log_errors_only |
|
log_errors_only = False |
|
|
|
def initialize(logdir: str, fatal_warnings: bool = False) -> None: |
|
global log_dir, log_file, log_fatal_warnings |
|
log_dir = logdir |
|
log_file = open(os.path.join(logdir, log_fname), 'w', encoding='utf-8') |
|
log_fatal_warnings = fatal_warnings |
|
|
|
def set_timestamp_start(start: float) -> None: |
|
global log_timestamp_start |
|
log_timestamp_start = start |
|
|
|
def shutdown() -> T.Optional[str]: |
|
global log_file |
|
if log_file is not None: |
|
path = log_file.name |
|
exception_around_goer = log_file |
|
log_file = None |
|
exception_around_goer.close() |
|
return path |
|
stop_pager() |
|
return None |
|
|
|
class AnsiDecorator: |
|
plain_code = "\033[0m" |
|
|
|
def __init__(self, text: str, code: str, quoted: bool = False): |
|
self.text = text |
|
self.code = code |
|
self.quoted = quoted |
|
|
|
def get_text(self, with_codes: bool) -> str: |
|
text = self.text |
|
if with_codes and self.code: |
|
text = self.code + self.text + AnsiDecorator.plain_code |
|
if self.quoted: |
|
text = f'"{text}"' |
|
return text |
|
|
|
def __len__(self) -> int: |
|
return len(self.text) |
|
|
|
def __str__(self) -> str: |
|
return self.get_text(colorize_console()) |
|
|
|
TV_Loggable = T.Union[str, AnsiDecorator, 'StringProtocol'] |
|
TV_LoggableList = T.List[TV_Loggable] |
|
|
|
class AnsiText: |
|
def __init__(self, *args: 'SizedStringProtocol'): |
|
self.args = args |
|
|
|
def __len__(self) -> int: |
|
return sum(len(x) for x in self.args) |
|
|
|
def __str__(self) -> str: |
|
return ''.join(str(x) for x in self.args) |
|
|
|
|
|
def bold(text: str, quoted: bool = False) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1m", quoted=quoted) |
|
|
|
def italic(text: str, quoted: bool = False) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[3m", quoted=quoted) |
|
|
|
def plain(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "") |
|
|
|
def red(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1;31m") |
|
|
|
def green(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1;32m") |
|
|
|
def yellow(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1;33m") |
|
|
|
def blue(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1;34m") |
|
|
|
def cyan(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[1;36m") |
|
|
|
def normal_red(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[31m") |
|
|
|
def normal_green(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[32m") |
|
|
|
def normal_yellow(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[33m") |
|
|
|
def normal_blue(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[34m") |
|
|
|
def normal_cyan(text: str) -> AnsiDecorator: |
|
return AnsiDecorator(text, "\033[36m") |
|
|
|
# This really should be AnsiDecorator or anything that implements |
|
# __str__(), but that requires protocols from typing_extensions |
|
def process_markup(args: T.Sequence[TV_Loggable], keep: bool) -> T.List[str]: |
|
arr = [] # type: T.List[str] |
|
if log_timestamp_start is not None: |
|
arr = ['[{:.3f}]'.format(time.monotonic() - log_timestamp_start)] |
|
for arg in args: |
|
if arg is None: |
|
continue |
|
if isinstance(arg, str): |
|
arr.append(arg) |
|
elif isinstance(arg, AnsiDecorator): |
|
arr.append(arg.get_text(keep)) |
|
else: |
|
arr.append(str(arg)) |
|
return arr |
|
|
|
def force_print(*args: str, nested: bool, **kwargs: T.Any) -> None: |
|
if log_disable_stdout: |
|
return |
|
iostr = io.StringIO() |
|
kwargs['file'] = iostr |
|
print(*args, **kwargs) |
|
|
|
raw = iostr.getvalue() |
|
if log_depth: |
|
prepend = log_depth[-1] + '| ' if nested else '' |
|
lines = [] |
|
for l in raw.split('\n'): |
|
l = l.strip() |
|
lines.append(prepend + l if l else '') |
|
raw = '\n'.join(lines) |
|
|
|
# _Something_ is going to get printed. |
|
try: |
|
output = log_pager.stdin if log_pager else None |
|
print(raw, end='', file=output) |
|
except UnicodeEncodeError: |
|
cleaned = raw.encode('ascii', 'replace').decode('ascii') |
|
print(cleaned, end='') |
|
|
|
# We really want a heterogeneous dict for this, but that's in typing_extensions |
|
def debug(*args: TV_Loggable, **kwargs: T.Any) -> None: |
|
arr = process_markup(args, False) |
|
if log_file is not None: |
|
print(*arr, file=log_file, **kwargs) |
|
log_file.flush() |
|
|
|
def _debug_log_cmd(cmd: str, args: T.List[str]) -> None: |
|
if not _in_ci: |
|
return |
|
args = [f'"{x}"' for x in args] # Quote all args, just in case |
|
debug('!meson_ci!/{} {}'.format(cmd, ' '.join(args))) |
|
|
|
def cmd_ci_include(file: str) -> None: |
|
_debug_log_cmd('ci_include', [file]) |
|
|
|
|
|
def log(*args: TV_Loggable, is_error: bool = False, |
|
once: bool = False, **kwargs: T.Any) -> None: |
|
if once: |
|
log_once(*args, is_error=is_error, **kwargs) |
|
else: |
|
_log(*args, is_error=is_error, **kwargs) |
|
|
|
|
|
def _log(*args: TV_Loggable, is_error: bool = False, |
|
**kwargs: T.Any) -> None: |
|
nested = kwargs.pop('nested', True) |
|
arr = process_markup(args, False) |
|
if log_file is not None: |
|
print(*arr, file=log_file, **kwargs) |
|
log_file.flush() |
|
if colorize_console(): |
|
arr = process_markup(args, True) |
|
if not log_errors_only or is_error: |
|
force_print(*arr, nested=nested, **kwargs) |
|
|
|
def log_once(*args: TV_Loggable, is_error: bool = False, |
|
**kwargs: T.Any) -> None: |
|
"""Log variant that only prints a given message one time per meson invocation. |
|
|
|
This considers ansi decorated values by the values they wrap without |
|
regard for the AnsiDecorator itself. |
|
""" |
|
def to_str(x: TV_Loggable) -> str: |
|
if isinstance(x, str): |
|
return x |
|
if isinstance(x, AnsiDecorator): |
|
return x.text |
|
return str(x) |
|
t = tuple(to_str(a) for a in args) |
|
if t in _logged_once: |
|
return |
|
_logged_once.add(t) |
|
_log(*args, is_error=is_error, **kwargs) |
|
|
|
# This isn't strictly correct. What we really want here is something like: |
|
# class StringProtocol(typing_extensions.Protocol): |
|
# |
|
# def __str__(self) -> str: ... |
|
# |
|
# This would more accurately embody what this function can handle, but we |
|
# don't have that yet, so instead we'll do some casting to work around it |
|
def get_error_location_string(fname: str, lineno: int) -> str: |
|
return f'{fname}:{lineno}:' |
|
|
|
def _log_error(severity: str, *rargs: TV_Loggable, |
|
once: bool = False, fatal: bool = True, **kwargs: T.Any) -> None: |
|
from .mesonlib import MesonException, relpath |
|
|
|
# The typing requirements here are non-obvious. Lists are invariant, |
|
# therefore T.List[A] and T.List[T.Union[A, B]] are not able to be joined |
|
if severity == 'notice': |
|
label = [bold('NOTICE:')] # type: TV_LoggableList |
|
elif severity == 'warning': |
|
label = [yellow('WARNING:')] |
|
elif severity == 'error': |
|
label = [red('ERROR:')] |
|
elif severity == 'deprecation': |
|
label = [red('DEPRECATION:')] |
|
else: |
|
raise MesonException('Invalid severity ' + severity) |
|
# rargs is a tuple, not a list |
|
args = label + list(rargs) |
|
|
|
location = kwargs.pop('location', None) |
|
if location is not None: |
|
location_file = relpath(location.filename, os.getcwd()) |
|
location_str = get_error_location_string(location_file, location.lineno) |
|
# Unions are frankly awful, and we have to T.cast here to get mypy |
|
# to understand that the list concatenation is safe |
|
location_list = T.cast('TV_LoggableList', [location_str]) |
|
args = location_list + args |
|
|
|
log(*args, once=once, **kwargs) |
|
|
|
global log_warnings_counter |
|
log_warnings_counter += 1 |
|
|
|
if log_fatal_warnings and fatal: |
|
raise MesonException("Fatal warnings enabled, aborting") |
|
|
|
def error(*args: TV_Loggable, **kwargs: T.Any) -> None: |
|
return _log_error('error', *args, **kwargs, is_error=True) |
|
|
|
def warning(*args: TV_Loggable, **kwargs: T.Any) -> None: |
|
return _log_error('warning', *args, **kwargs, is_error=True) |
|
|
|
def deprecation(*args: TV_Loggable, **kwargs: T.Any) -> None: |
|
return _log_error('deprecation', *args, **kwargs, is_error=True) |
|
|
|
def notice(*args: TV_Loggable, **kwargs: T.Any) -> None: |
|
return _log_error('notice', *args, **kwargs, is_error=False) |
|
|
|
def get_relative_path(target: Path, current: Path) -> Path: |
|
"""Get the path to target from current""" |
|
# Go up "current" until we find a common ancestor to target |
|
acc = ['.'] |
|
for part in [current, *current.parents]: |
|
try: |
|
path = target.relative_to(part) |
|
return Path(*acc, path) |
|
except ValueError: |
|
pass |
|
acc += ['..'] |
|
|
|
# we failed, should not get here |
|
return target |
|
|
|
def exception(e: Exception, prefix: T.Optional[AnsiDecorator] = None) -> None: |
|
if prefix is None: |
|
prefix = red('ERROR:') |
|
log() |
|
args = [] # type: T.List[T.Union[AnsiDecorator, str]] |
|
if all(getattr(e, a, None) is not None for a in ['file', 'lineno', 'colno']): |
|
# Mypy doesn't follow hasattr, and it's pretty easy to visually inspect |
|
# that this is correct, so we'll just ignore it. |
|
path = get_relative_path(Path(e.file), Path(os.getcwd())) # type: ignore |
|
args.append(f'{path}:{e.lineno}:{e.colno}:') # type: ignore |
|
if prefix: |
|
args.append(prefix) |
|
args.append(str(e)) |
|
log(*args) |
|
|
|
# Format a list for logging purposes as a string. It separates |
|
# all but the last item with commas, and the last with 'and'. |
|
def format_list(input_list: T.List[str]) -> str: |
|
l = len(input_list) |
|
if l > 2: |
|
return ' and '.join([', '.join(input_list[:-1]), input_list[-1]]) |
|
elif l == 2: |
|
return ' and '.join(input_list) |
|
elif l == 1: |
|
return input_list[0] |
|
else: |
|
return '' |
|
|
|
@contextmanager |
|
def nested(name: str = '') -> T.Generator[None, None, None]: |
|
log_depth.append(name) |
|
try: |
|
yield |
|
finally: |
|
log_depth.pop() |
|
|
|
def start_pager() -> None: |
|
if not colorize_console(): |
|
return |
|
if 'PAGER' in os.environ: |
|
pager_cmd = shlex.split(os.environ['PAGER']) |
|
else: |
|
# "R" : support color |
|
# "X" : do not clear the screen when leaving the pager |
|
# "F" : skip the pager if content fit into the screen |
|
pager_cmd = ['less', '-RXF'] |
|
global log_pager |
|
assert log_pager is None |
|
try: |
|
log_pager = subprocess.Popen(pager_cmd, stdin=subprocess.PIPE, |
|
text=True, encoding='utf-8') |
|
except Exception as e: |
|
# Ignore errors, unless it is a user defined pager. |
|
if 'PAGER' in os.environ: |
|
from .mesonlib import MesonException |
|
raise MesonException(f'Failed to start pager: {str(e)}') |
|
|
|
def stop_pager() -> None: |
|
global log_pager |
|
if log_pager: |
|
try: |
|
log_pager.stdin.flush() |
|
log_pager.stdin.close() |
|
except BrokenPipeError: |
|
pass |
|
log_pager.wait() |
|
log_pager = None
|
|
|