The Meson Build System

964 lines
37 KiB

#!/usr/bin/env python3
# Copyright 2016 The Meson development team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# This class contains the basic functionality needed to run any interpreter
# or an interpreter-based tool.
# This tool is used to manipulate an existing Meson build definition.
# - add a file to a target
# - remove files from a target
# - move targets
# - reindent?
from .ast import IntrospectionInterpreter, build_target_functions, AstConditionLevel, AstIDGenerator, AstIndentationGenerator, AstPrinter
from mesonbuild.mesonlib import MesonException
from . import mlog, environment
from functools import wraps
from typing import List, Dict, Optional
from .mparser import Token, ArrayNode, ArgumentNode, AssignmentNode, BaseNode, BooleanNode, ElementaryNode, IdNode, FunctionNode, StringNode
import json, os, re, sys
class RewriterException(MesonException):
def add_arguments(parser, formatter=None):
parser.add_argument('-s', '--sourcedir', type=str, default='.', metavar='SRCDIR', help='Path to source directory.')
parser.add_argument('-V', '--verbose', action='store_true', default=False, help='Enable verbose output')
parser.add_argument('-S', '--skip-errors', dest='skip', action='store_true', default=False, help='Skip errors instead of aborting')
subparsers = parser.add_subparsers(dest='type', title='Rewriter commands', description='Rewrite command to execute')
# Target
tgt_parser = subparsers.add_parser('target', help='Modify a target', formatter_class=formatter)
tgt_parser.add_argument('-s', '--subdir', default='', dest='subdir', help='Subdirectory of the new target (only for the "add_target" action)')
tgt_parser.add_argument('--type', dest='tgt_type', choices=rewriter_keys['target']['target_type'][2], default='executable',
help='Type of the target to add (only for the "add_target" action)')
tgt_parser.add_argument('target', help='Name or ID of the target')
tgt_parser.add_argument('operation', choices=['add', 'rm', 'add_target', 'rm_target', 'info'],
help='Action to execute')
tgt_parser.add_argument('sources', nargs='*', help='Sources to add/remove')
kw_parser = subparsers.add_parser('kwargs', help='Modify keyword arguments', formatter_class=formatter)
kw_parser.add_argument('operation', choices=rewriter_keys['kwargs']['operation'][2],
help='Action to execute')
kw_parser.add_argument('function', choices=list(rewriter_func_kwargs.keys()),
help='Function type to modify')
kw_parser.add_argument('id', help='ID of the function to modify (can be anything for "project")')
kw_parser.add_argument('kwargs', nargs='*', help='Pairs of keyword and value')
# Default options
def_parser = subparsers.add_parser('default-options', help='Modify the project default options', formatter_class=formatter)
def_parser.add_argument('operation', choices=rewriter_keys['default_options']['operation'][2],
help='Action to execute')
def_parser.add_argument('options', nargs='*', help='Key, value pairs of configuration option')
# JSON file/command
cmd_parser = subparsers.add_parser('command', help='Execute a JSON array of commands', formatter_class=formatter)
cmd_parser.add_argument('json', help='JSON string or file to execute')
class RequiredKeys:
def __init__(self, keys):
self.keys = keys
def __call__(self, f):
def wrapped(*wrapped_args, **wrapped_kwargs):
assert(len(wrapped_args) >= 2)
cmd = wrapped_args[1]
for key, val in self.keys.items():
typ = val[0] # The type of the value
default = val[1] # The default value -- None is required
choices = val[2] # Valid choices -- None is for everything
if key not in cmd:
if default is not None:
cmd[key] = default
raise RewriterException('Key "{}" is missing in object for {}'
.format(key, f.__name__))
if not isinstance(cmd[key], typ):
raise RewriterException('Invalid type of "{}". Required is {} but provided was {}'
.format(key, typ.__name__, type(cmd[key]).__name__))
if choices is not None:
assert(isinstance(choices, list))
if cmd[key] not in choices:
raise RewriterException('Invalid value of "{}": Possible values are {} but provided was "{}"'
.format(key, choices, cmd[key]))
return f(*wrapped_args, **wrapped_kwargs)
return wrapped
class MTypeBase:
def __init__(self, node: Optional[BaseNode] = None):
if node is None:
self.node = self._new_node() # lgtm [py/init-calls-subclass] (node creation does not depend on base class state)
self.node = node
self.node_type = None
for i in self.supported_nodes(): # lgtm [py/init-calls-subclass] (listing nodes does not depend on base class state)
if isinstance(self.node, i):
self.node_type = i
def _new_node(self):
# Overwrite in derived class
return BaseNode()
def can_modify(self):
return self.node_type is not None
def get_node(self):
return self.node
def supported_nodes(self):
# Overwrite in derived class
return []
def set_value(self, value):
# Overwrite in derived class
mlog.warning('Cannot set the value of type', mlog.bold(type(self).__name__), '--> skipping')
def add_value(self, value):
# Overwrite in derived class
mlog.warning('Cannot add a value of type', mlog.bold(type(self).__name__), '--> skipping')
def remove_value(self, value):
# Overwrite in derived class
mlog.warning('Cannot remove a value of type', mlog.bold(type(self).__name__), '--> skipping')
def remove_regex(self, value):
# Overwrite in derived class
mlog.warning('Cannot remove a regex in type', mlog.bold(type(self).__name__), '--> skipping')
class MTypeStr(MTypeBase):
def __init__(self, node: Optional[BaseNode] = None):
def _new_node(self):
return StringNode(Token('', '', 0, 0, 0, None, ''))
def supported_nodes(self):
return [StringNode]
def set_value(self, value):
self.node.value = str(value)
class MTypeBool(MTypeBase):
def __init__(self, node: Optional[BaseNode] = None):
def _new_node(self):
return StringNode(Token('', '', 0, 0, 0, None, False))
def supported_nodes(self):
return [BooleanNode]
def set_value(self, value):
self.node.value = bool(value)
class MTypeID(MTypeBase):
def __init__(self, node: Optional[BaseNode] = None):
def _new_node(self):
return StringNode(Token('', '', 0, 0, 0, None, ''))
def supported_nodes(self):
return [IdNode]
def set_value(self, value):
self.node.value = str(value)
class MTypeList(MTypeBase):
def __init__(self, node: Optional[BaseNode] = None):
def _new_node(self):
return ArrayNode(ArgumentNode(Token('', '', 0, 0, 0, None, '')), 0, 0, 0, 0)
def _new_element_node(self, value):
# Overwrite in derived class
return BaseNode()
def _ensure_array_node(self):
if not isinstance(self.node, ArrayNode):
tmp = self.node
self.node = self._new_node()
self.node.args.arguments += [tmp]
def _check_is_equal(self, node, value) -> bool:
# Overwrite in derived class
return False
def _check_regex_matches(self, node, regex: str) -> bool:
# Overwrite in derived class
return False
def get_node(self):
if isinstance(self.node, ArrayNode):
if len(self.node.args.arguments) == 1:
return self.node.args.arguments[0]
return self.node
def supported_element_nodes(self):
# Overwrite in derived class
return []
def supported_nodes(self):
return [ArrayNode] + self.supported_element_nodes()
def set_value(self, value):
if not isinstance(value, list):
value = [value]
self.node.args.arguments = [] # Remove all current nodes
for i in value:
self.node.args.arguments += [self._new_element_node(i)]
def add_value(self, value):
if not isinstance(value, list):
value = [value]
for i in value:
self.node.args.arguments += [self._new_element_node(i)]
def _remove_helper(self, value, equal_func):
def check_remove_node(node):
for j in value:
if equal_func(i, j):
return True
return False
if not isinstance(value, list):
value = [value]
removed_list = []
for i in self.node.args.arguments:
if not check_remove_node(i):
removed_list += [i]
self.node.args.arguments = removed_list
def remove_value(self, value):
self._remove_helper(value, self._check_is_equal)
def remove_regex(self, regex: str):
self._remove_helper(regex, self._check_regex_matches)
class MTypeStrList(MTypeList):
def __init__(self, node: Optional[BaseNode] = None):
def _new_element_node(self, value):
return StringNode(Token('', '', 0, 0, 0, None, str(value)))
def _check_is_equal(self, node, value) -> bool:
if isinstance(node, StringNode):
return node.value == value
return False
def _check_regex_matches(self, node, regex: str) -> bool:
if isinstance(node, StringNode):
return re.match(regex, node.value) is not None
return False
def supported_element_nodes(self):
return [StringNode]
class MTypeIDList(MTypeList):
def __init__(self, node: Optional[BaseNode] = None):
def _new_element_node(self, value):
return IdNode(Token('', '', 0, 0, 0, None, str(value)))
def _check_is_equal(self, node, value) -> bool:
if isinstance(node, IdNode):
return node.value == value
return False
def _check_regex_matches(self, node, regex: str) -> bool:
if isinstance(node, StringNode):
return re.match(regex, node.value) is not None
return False
def supported_element_nodes(self):
return [IdNode]
rewriter_keys = {
'default_options': {
'operation': (str, None, ['set', 'delete']),
'options': (dict, {}, None)
'kwargs': {
'function': (str, None, None),
'id': (str, None, None),
'operation': (str, None, ['set', 'delete', 'add', 'remove', 'remove_regex', 'info']),
'kwargs': (dict, {}, None)
'target': {
'target': (str, None, None),
'operation': (str, None, ['src_add', 'src_rm', 'target_rm', 'target_add', 'info']),
'sources': (list, [], None),
'subdir': (str, '', None),
'target_type': (str, 'executable', ['both_libraries', 'executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library']),
rewriter_func_kwargs = {
'dependency': {
'language': MTypeStr,
'method': MTypeStr,
'native': MTypeBool,
'not_found_message': MTypeStr,
'required': MTypeBool,
'static': MTypeBool,
'version': MTypeStrList,
'modules': MTypeStrList
'target': {
'build_by_default': MTypeBool,
'build_rpath': MTypeStr,
'dependencies': MTypeIDList,
'gui_app': MTypeBool,
'link_with': MTypeIDList,
'export_dynamic': MTypeBool,
'implib': MTypeBool,
'install': MTypeBool,
'install_dir': MTypeStr,
'install_rpath': MTypeStr,
'pie': MTypeBool
'project': {
'default_options': MTypeStrList,
'meson_version': MTypeStr,
'license': MTypeStrList,
'subproject_dir': MTypeStr,
'version': MTypeStr
class Rewriter:
def __init__(self, sourcedir: str, generator: str = 'ninja', skip_errors: bool = False):
self.sourcedir = sourcedir
self.interpreter = IntrospectionInterpreter(sourcedir, '', generator, visitors = [AstIDGenerator(), AstIndentationGenerator(), AstConditionLevel()])
self.skip_errors = skip_errors
self.modefied_nodes = []
self.to_remove_nodes = []
self.to_add_nodes = []
self.functions = {
'default_options': self.process_default_options,
'kwargs': self.process_kwargs,
'target': self.process_target,
self.info_dump = None
def analyze_meson(self):
mlog.log('Analyzing meson file:', mlog.bold(os.path.join(self.sourcedir, environment.build_filename)))
mlog.log(' -- Project:', mlog.bold(self.interpreter.project_data['descriptive_name']))
mlog.log(' -- Version:', mlog.cyan(self.interpreter.project_data['version']))
def add_info(self, cmd_type: str, cmd_id: str, data: dict):
if self.info_dump is None:
self.info_dump = {}
if cmd_type not in self.info_dump:
self.info_dump[cmd_type] = {}
self.info_dump[cmd_type][cmd_id] = data
def print_info(self):
if self.info_dump is None:
sys.stderr.write(json.dumps(self.info_dump, indent=2))
def on_error(self):
if self.skip_errors:
return mlog.cyan('-->'), mlog.yellow('skipping')
return mlog.cyan('-->'),'aborting')
def handle_error(self):
if self.skip_errors:
return None
raise MesonException('Rewriting the failed')
def find_target(self, target: str):
def check_list(name: str) -> List[BaseNode]:
result = []
for i in self.interpreter.targets:
if name == i['name'] or name == i['id']:
result += [i]
return result
targets = check_list(target)
if targets:
if len(targets) == 1:
return targets[0]
mlog.error('There are multiple targets matching', mlog.bold(target))
for i in targets:
mlog.error(' -- Target name', mlog.bold(i['name']), 'with ID', mlog.bold(i['id']))
mlog.error('Please try again with the unique ID of the target', *self.on_error())
return None
# Check the assignments
tgt = None
if target in self.interpreter.assignments:
node = self.interpreter.assignments[target][0]
if isinstance(node, FunctionNode):
if node.func_name in ['executable', 'jar', 'library', 'shared_library', 'shared_module', 'static_library', 'both_libraries']:
tgt = self.interpreter.assign_vals[target][0]
return tgt
def find_dependency(self, dependency: str):
def check_list(name: str):
for i in self.interpreter.dependencies:
if name == i['name']:
return i
return None
dep = check_list(dependency)
if dep is not None:
return dep
# Check the assignments
if dependency in self.interpreter.assignments:
node = self.interpreter.assignments[dependency][0]
if isinstance(node, FunctionNode):
if node.func_name in ['dependency']:
name = self.interpreter.flatten_args(node.args)[0]
dep = check_list(name)
return dep
def process_default_options(self, cmd):
# First, remove the old values
kwargs_cmd = {
'function': 'project',
'id': "/",
'operation': 'remove_regex',
'kwargs': {
'default_options': ['{}=.*'.format(x) for x in cmd['options'].keys()]
# Then add the new values
if cmd['operation'] != 'set':
kwargs_cmd['operation'] = 'add'
kwargs_cmd['kwargs']['default_options'] = []
cdata = self.interpreter.coredata
options = {
**{'build.' + k: o for k, o in},
**{'build.' + k: o for k, o in},
for key, val in sorted(cmd['options'].items()):
if key not in options:
mlog.error('Unknown options', mlog.bold(key), *self.on_error())
val = options[key].validate_value(val)
except MesonException as e:
mlog.error('Unable to set', mlog.bold(key),, *self.on_error())
kwargs_cmd['kwargs']['default_options'] += ['{}={}'.format(key, val)]
def process_kwargs(self, cmd):
mlog.log('Processing function type', mlog.bold(cmd['function']), 'with id', mlog.cyan("'" + cmd['id'] + "'"))
if cmd['function'] not in rewriter_func_kwargs:
mlog.error('Unknown function type', cmd['function'], *self.on_error())
return self.handle_error()
kwargs_def = rewriter_func_kwargs[cmd['function']]
# Find the function node to modify
node = None
arg_node = None
if cmd['function'] == 'project':
if cmd['id'] != '/':
mlog.error('The ID for the function type project must be "/"', *self.on_error())
return self.handle_error()
node = self.interpreter.project_node
arg_node = node.args
elif cmd['function'] == 'target':
tmp = self.find_target(cmd['id'])
if tmp:
node = tmp['node']
arg_node = node.args
elif cmd['function'] == 'dependency':
tmp = self.find_dependency(cmd['id'])
if tmp:
node = tmp['node']
arg_node = node.args
if not node:
mlog.error('Unable to find the function node')
assert(isinstance(node, FunctionNode))
assert(isinstance(arg_node, ArgumentNode))
# Print kwargs info
if cmd['operation'] == 'info':
info_data = {}
for key, val in sorted(arg_node.kwargs.items()):
info_data[key] = None
if isinstance(val, ElementaryNode):
info_data[key] = val.value
elif isinstance(val, ArrayNode):
data_list = []
for i in val.args.arguments:
element = None
if isinstance(i, ElementaryNode):
element = i.value
data_list += [element]
info_data[key] = data_list
self.add_info('kwargs', '{}#{}'.format(cmd['function'], cmd['id']), info_data)
return # Nothing else to do
# Modify the kwargs
num_changed = 0
for key, val in sorted(cmd['kwargs'].items()):
if key not in kwargs_def:
mlog.error('Cannot modify unknown kwarg', mlog.bold(key), *self.on_error())
# Remove the key from the kwargs
if cmd['operation'] == 'delete':
if key in arg_node.kwargs:
mlog.log(' -- Deleting', mlog.bold(key), 'from the kwargs')
del arg_node.kwargs[key]
num_changed += 1
mlog.log(' -- Key', mlog.bold(key), 'is already deleted')
if key not in arg_node.kwargs:
arg_node.kwargs[key] = None
modifyer = kwargs_def[key](arg_node.kwargs[key])
if not modifyer.can_modify():
mlog.log(' -- Skipping', mlog.bold(key), 'because it is to complex to modify')
# Apply the operation
val_str = str(val)
if cmd['operation'] == 'set':
mlog.log(' -- Setting', mlog.bold(key), 'to', mlog.yellow(val_str))
elif cmd['operation'] == 'add':
mlog.log(' -- Adding', mlog.yellow(val_str), 'to', mlog.bold(key))
elif cmd['operation'] == 'remove':
mlog.log(' -- Removing', mlog.yellow(val_str), 'from', mlog.bold(key))
elif cmd['operation'] == 'remove_regex':
mlog.log(' -- Removing all values matching', mlog.yellow(val_str), 'from', mlog.bold(key))
# Write back the result
arg_node.kwargs[key] = modifyer.get_node()
num_changed += 1
if num_changed > 0 and node not in self.modefied_nodes:
self.modefied_nodes += [node]
def find_assignment_node(self, node: BaseNode) -> AssignmentNode:
if hasattr(node, 'ast_id') and node.ast_id in self.interpreter.reverse_assignment:
return self.interpreter.reverse_assignment[node.ast_id]
return None
def process_target(self, cmd):
mlog.log('Processing target', mlog.bold(cmd['target']), 'operation', mlog.cyan(cmd['operation']))
target = self.find_target(cmd['target'])
if target is None and cmd['operation'] != 'target_add':
mlog.error('Unknown target', mlog.bold(cmd['target']), *self.on_error())
return self.handle_error()
# Make source paths relative to the current subdir
def rel_source(src: str) -> str:
subdir = os.path.abspath(os.path.join(self.sourcedir, target['subdir']))
if os.path.isabs(src):
return os.path.relpath(src, subdir)
elif not os.path.exists(src):
return src # Trust the user when the source doesn't exist
# Make sure that the path is relative to the subdir
return os.path.relpath(os.path.abspath(src), subdir)
if target is not None:
cmd['sources'] = [rel_source(x) for x in cmd['sources']]
# Utility function to get a list of the sources from a node
def arg_list_from_node(n):
args = []
if isinstance(n, FunctionNode):
args = list(n.args.arguments)
if n.func_name in build_target_functions:
elif isinstance(n, ArrayNode):
args = n.args.arguments
elif isinstance(n, ArgumentNode):
args = n.arguments
return args
to_sort_nodes = []
if cmd['operation'] == 'src_add':
node = None
if target['sources']:
node = target['sources'][0]
node = target['node']
assert(node is not None)
# Generate the current source list
src_list = []
for i in target['sources']:
for j in arg_list_from_node(i):
if isinstance(j, StringNode):
src_list += [j.value]
# Generate the new String nodes
to_append = []
for i in sorted(set(cmd['sources'])):
if i in src_list:
mlog.log(' -- Source',, 'is already defined for the target --> skipping')
mlog.log(' -- Adding source',, 'at',
mlog.yellow('{}:{}'.format(os.path.join(node.subdir, environment.build_filename), node.lineno)))
token = Token('string', node.subdir, 0, 0, 0, None, i)
to_append += [StringNode(token)]
# Append to the AST at the right place
arg_node = None
if isinstance(node, (FunctionNode, ArrayNode)):
arg_node = node.args
elif isinstance(node, ArgumentNode):
arg_node = node
assert(arg_node is not None)
arg_node.arguments += to_append
# Mark the node as modified
if arg_node not in to_sort_nodes and not isinstance(node, FunctionNode):
to_sort_nodes += [arg_node]
if node not in self.modefied_nodes:
self.modefied_nodes += [node]
elif cmd['operation'] == 'src_rm':
# Helper to find the exact string node and its parent
def find_node(src):
for i in target['sources']:
for j in arg_list_from_node(i):
if isinstance(j, StringNode):
if j.value == src:
return i, j
return None, None
for i in cmd['sources']:
# Try to find the node with the source string
root, string_node = find_node(i)
if root is None:
mlog.warning(' -- Unable to find source',, 'in the target')
# Remove the found string node from the argument list
arg_node = None
if isinstance(root, (FunctionNode, ArrayNode)):
arg_node = root.args
elif isinstance(root, ArgumentNode):
arg_node = root
assert(arg_node is not None)
mlog.log(' -- Removing source',, 'from',
mlog.yellow('{}:{}'.format(os.path.join(string_node.subdir, environment.build_filename), string_node.lineno)))
# Mark the node as modified
if arg_node not in to_sort_nodes and not isinstance(root, FunctionNode):
to_sort_nodes += [arg_node]
if root not in self.modefied_nodes:
self.modefied_nodes += [root]
elif cmd['operation'] == 'target_add':
if target is not None:
mlog.error('Can not add target', mlog.bold(cmd['target']), 'because it already exists', *self.on_error())
return self.handle_error()
id_base = re.sub(r'[- ]', '_', cmd['target'])
target_id = id_base + '_exe' if cmd['target_type'] == 'executable' else '_lib'
source_id = id_base + '_sources'
# Build src list
src_arg_node = ArgumentNode(Token('string', cmd['subdir'], 0, 0, 0, None, ''))
src_arr_node = ArrayNode(src_arg_node, 0, 0, 0, 0)
src_far_node = ArgumentNode(Token('string', cmd['subdir'], 0, 0, 0, None, ''))
src_fun_node = FunctionNode(cmd['subdir'], 0, 0, 0, 0, 'files', src_far_node)
src_ass_node = AssignmentNode(cmd['subdir'], 0, 0, source_id, src_fun_node)
src_arg_node.arguments = [StringNode(Token('string', cmd['subdir'], 0, 0, 0, None, x)) for x in cmd['sources']]
src_far_node.arguments = [src_arr_node]
# Build target
tgt_arg_node = ArgumentNode(Token('string', cmd['subdir'], 0, 0, 0, None, ''))
tgt_fun_node = FunctionNode(cmd['subdir'], 0, 0, 0, 0, cmd['target_type'], tgt_arg_node)
tgt_ass_node = AssignmentNode(cmd['subdir'], 0, 0, target_id, tgt_fun_node)
tgt_arg_node.arguments = [
StringNode(Token('string', cmd['subdir'], 0, 0, 0, None, cmd['target'])),
IdNode(Token('string', cmd['subdir'], 0, 0, 0, None, source_id))
self.to_add_nodes += [src_ass_node, tgt_ass_node]
elif cmd['operation'] == 'target_rm':
to_remove = self.find_assignment_node(target['node'])
if to_remove is None:
to_remove = target['node']
self.to_remove_nodes += [to_remove]
mlog.log(' -- Removing target',['target']), 'at',
mlog.yellow('{}:{}'.format(os.path.join(to_remove.subdir, environment.build_filename), to_remove.lineno)))
elif cmd['operation'] == 'info':
# List all sources in the target
src_list = []
for i in target['sources']:
for j in arg_list_from_node(i):
if isinstance(j, StringNode):
src_list += [j.value]
test_data = {
'name': target['name'],
'sources': src_list
self.add_info('target', target['id'], test_data)
# Sort files
for i in to_sort_nodes:
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
path_sorter = lambda key: ([(key.count('/') <= idx, alphanum_key(x)) for idx, x in enumerate(key.split('/'))])
unknown = [x for x in i.arguments if not isinstance(x, StringNode)]
sources = [x for x in i.arguments if isinstance(x, StringNode)]
sources = sorted(sources, key=lambda x: path_sorter(x.value))
i.arguments = unknown + sources
def process(self, cmd):
if 'type' not in cmd:
raise RewriterException('Command has no key "type"')
if cmd['type'] not in self.functions:
raise RewriterException('Unknown command "{}". Supported commands are: {}'
.format(cmd['type'], list(self.functions.keys())))
def apply_changes(self):
assert(all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'subdir') for x in self.modefied_nodes))
assert(all(hasattr(x, 'lineno') and hasattr(x, 'colno') and hasattr(x, 'subdir') for x in self.to_remove_nodes))
assert(all(isinstance(x, (ArrayNode, FunctionNode)) for x in self.modefied_nodes))
assert(all(isinstance(x, (ArrayNode, AssignmentNode, FunctionNode)) for x in self.to_remove_nodes))
# Sort based on line and column in reversed order
work_nodes = [{'node': x, 'action': 'modify'} for x in self.modefied_nodes]
work_nodes += [{'node': x, 'action': 'rm'} for x in self.to_remove_nodes]
work_nodes = list(sorted(work_nodes, key=lambda x: (x['node'].lineno, x['node'].colno), reverse=True))
work_nodes += [{'node': x, 'action': 'add'} for x in self.to_add_nodes]
# Generating the new replacement string
str_list = []
for i in work_nodes:
new_data = ''
if i['action'] == 'modify' or i['action'] == 'add':
printer = AstPrinter()
new_data = printer.result.strip()
data = {
'file': os.path.join(i['node'].subdir, environment.build_filename),
'str': new_data,
'node': i['node'],
'action': i['action']
str_list += [data]
# Load build files
files = {}
for i in str_list:
if i['file'] in files:
fpath = os.path.realpath(os.path.join(self.sourcedir, i['file']))
fdata = ''
# Create an empty file if it does not exist
if not os.path.exists(fpath):
with open(fpath, 'w'):
with open(fpath, 'r') as fp:
fdata =
# Generate line offsets numbers
m_lines = fdata.splitlines(True)
offset = 0
line_offsets = []
for j in m_lines:
line_offsets += [offset]
offset += len(j)
files[i['file']] = {
'path': fpath,
'raw': fdata,
'offsets': line_offsets
# Replace in source code
def remove_node(i):
offsets = files[i['file']]['offsets']
raw = files[i['file']]['raw']
node = i['node']
line = node.lineno - 1
col = node.colno
start = offsets[line] + col
end = start
if isinstance(node, (ArrayNode, FunctionNode)):
end = offsets[node.end_lineno - 1] + node.end_colno
# Only removal is supported for assignments
elif isinstance(node, AssignmentNode) and i['action'] == 'rm':
if isinstance(node.value, (ArrayNode, FunctionNode)):
remove_node({'file': i['file'], 'str': '', 'node': node.value, 'action': 'rm'})
raw = files[i['file']]['raw']
while raw[end] != '=':
end += 1
end += 1 # Handle the '='
while raw[end] in [' ', '\n', '\t']:
end += 1
files[i['file']]['raw'] = raw[:start] + i['str'] + raw[end:]
for i in str_list:
if i['action'] in ['modify', 'rm']:
elif i['action'] in ['add']:
files[i['file']]['raw'] += i['str'] + '\n'
# Write the files back
for key, val in files.items():
mlog.log('Rewriting', mlog.yellow(key))
with open(val['path'], 'w') as fp:
target_operation_map = {
'add': 'src_add',
'rm': 'src_rm',
'add_target': 'target_add',
'rm_target': 'target_rm',
'info': 'info',
def list_to_dict(in_list: List[str]) -> Dict[str, str]:
result = {}
it = iter(in_list)
for i in it:
# calling next(it) is not a mistake, we're taking the next element from
# the iterator, avoiding the need to preprocess it into a sequence of
# key value pairs.
result[i] = next(it)
except StopIteration:
raise TypeError('in_list parameter of list_to_dict must have an even length.')
return result
def generate_target(options) -> List[dict]:
return [{
'type': 'target',
'operation': target_operation_map[options.operation],
'sources': options.sources,
'subdir': options.subdir,
'target_type': options.tgt_type,
def generate_kwargs(options) -> List[dict]:
return [{
'type': 'kwargs',
'function': options.function,
'operation': options.operation,
'kwargs': list_to_dict(options.kwargs),
def generate_def_opts(options) -> List[dict]:
return [{
'type': 'default_options',
'operation': options.operation,
'options': list_to_dict(options.options),
def genreate_cmd(options) -> List[dict]:
if os.path.exists(options.json):
with open(options.json, 'r') as fp:
return json.load(fp)
return json.loads(options.json)
# Map options.type to the actual type name
cli_type_map = {
'target': generate_target,
'tgt': generate_target,
'kwargs': generate_kwargs,
'default-options': generate_def_opts,
'def': generate_def_opts,
'command': genreate_cmd,
'cmd': genreate_cmd,
def run(options):
if not options.verbose:
rewriter = Rewriter(options.sourcedir, skip_errors=options.skip)
if options.type is None:
mlog.error('No command specified')
return 1
commands = cli_type_map[options.type](options)
if not isinstance(commands, list):
raise TypeError('Command is not a list')
for i in commands:
if not isinstance(i, object):
raise TypeError('Command is not an object')
return 0
except Exception as e:
raise e