From 449dd8e72a3ccc4c6f7ec70169515784cd687a2c Mon Sep 17 00:00:00 2001 From: Daniel Mensinger Date: Fri, 28 Aug 2020 21:23:10 +0200 Subject: [PATCH] typing: fully annotate wrap --- mesonbuild/wrap/__init__.py | 2 +- mesonbuild/wrap/wrap.py | 30 +++++++++++++------------- mesonbuild/wrap/wraptool.py | 42 +++++++++++++++++++++---------------- run_mypy.py | 2 +- 4 files changed, 41 insertions(+), 35 deletions(-) diff --git a/mesonbuild/wrap/__init__.py b/mesonbuild/wrap/__init__.py index 182a94de5..17711461e 100644 --- a/mesonbuild/wrap/__init__.py +++ b/mesonbuild/wrap/__init__.py @@ -52,6 +52,6 @@ class WrapMode(Enum): return self.name @staticmethod - def from_string(mode_name: str): + def from_string(mode_name: str) -> 'WrapMode': g = string_to_value[mode_name] return WrapMode(g) diff --git a/mesonbuild/wrap/wrap.py b/mesonbuild/wrap/wrap.py index aba220e1b..eacc1ddd5 100644 --- a/mesonbuild/wrap/wrap.py +++ b/mesonbuild/wrap/wrap.py @@ -83,7 +83,7 @@ def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse': url = whitelist_wrapdb(urlstring) if has_ssl: try: - return urllib.request.urlopen(urllib.parse.urlunparse(url), timeout=REQ_TIMEOUT) + return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(url), timeout=REQ_TIMEOUT)) except urllib.error.URLError as excp: raise WrapException('WrapDB connection failed to {} with error {}'.format(urlstring, excp)) @@ -93,7 +93,7 @@ def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse': mlog.warning('SSL module not available in {}: WrapDB traffic not authenticated.'.format(sys.executable)) SSL_WARNING_PRINTED = True try: - return urllib.request.urlopen(urllib.parse.urlunparse(nossl_url), timeout=REQ_TIMEOUT) + return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(nossl_url), timeout=REQ_TIMEOUT)) except urllib.error.URLError as excp: raise WrapException('WrapDB connection failed to {} with error {}'.format(urlstring, excp)) @@ -107,7 +107,7 @@ class WrapNotFoundException(WrapException): class PackageDefinition: def __init__(self, fname: str): self.filename = fname - self.type = None + self.type = None # type: str self.values = {} # type: T.Dict[str, str] self.provided_deps = {} # type: T.Dict[str, T.Optional[str]] self.provided_programs = [] # type: T.List[str] @@ -122,7 +122,7 @@ class PackageDefinition: if os.path.dirname(self.directory): raise WrapException('Directory key must be a name and not a path') - def parse_wrap(self, fname: str): + def parse_wrap(self, fname: str) -> None: try: self.config = configparser.ConfigParser(interpolation=None) self.config.read(fname) @@ -131,7 +131,7 @@ class PackageDefinition: self.parse_wrap_section() self.parse_provide_section() - def parse_wrap_section(self): + def parse_wrap_section(self) -> None: if len(self.config.sections()) < 1: raise WrapException('Missing sections in {}'.format(self.basename)) self.wrap_section = self.config.sections()[0] @@ -141,19 +141,19 @@ class PackageDefinition: self.type = self.wrap_section[5:] self.values = dict(self.config[self.wrap_section]) - def parse_provide_section(self): + def parse_provide_section(self) -> None: if self.config.has_section('provide'): for k, v in self.config['provide'].items(): if k == 'dependency_names': # A comma separated list of dependency names that does not # need a variable name - names = {n.strip(): None for n in v.split(',')} - self.provided_deps.update(names) + names_dict = {n.strip(): None for n in v.split(',')} + self.provided_deps.update(names_dict) continue if k == 'program_names': # A comma separated list of program names - names = [n.strip() for n in v.split(',')] - self.provided_programs += names + names_list = [n.strip() for n in v.split(',')] + self.provided_programs += names_list continue if not v: m = ('Empty dependency variable name for {!r} in {}. ' @@ -177,7 +177,7 @@ def get_directory(subdir_root: str, packagename: str) -> str: return packagename class Resolver: - def __init__(self, subdir_root: str, wrap_mode=WrapMode.default): + def __init__(self, subdir_root: str, wrap_mode: WrapMode = WrapMode.default) -> None: self.wrap_mode = wrap_mode self.subdir_root = subdir_root self.cachedir = os.path.join(self.subdir_root, 'packagecache') @@ -187,7 +187,7 @@ class Resolver: self.provided_programs = {} # type: T.Dict[str, PackageDefinition] self.load_wraps() - def load_wraps(self): + def load_wraps(self) -> None: if not os.path.isdir(self.subdir_root): return root, dirs, files = next(os.walk(self.subdir_root)) @@ -221,7 +221,7 @@ class Resolver: raise WrapException(m.format(k, wrap.basename, prev_wrap.basename)) self.provided_programs[k] = wrap - def find_dep_provider(self, packagename: str): + def find_dep_provider(self, packagename: str) -> T.Optional[T.Union[str, T.List[str]]]: # Return value is in the same format as fallback kwarg: # ['subproject_name', 'variable_name'], or 'subproject_name'. wrap = self.provided_deps.get(packagename) @@ -232,7 +232,7 @@ class Resolver: return wrap.name return None - def find_program_provider(self, names: T.List[str]): + def find_program_provider(self, names: T.List[str]) -> T.Optional[str]: for name in names: wrap = self.provided_programs.get(name) if wrap: @@ -464,7 +464,7 @@ class Resolver: if dhash != expected: raise WrapException('Incorrect hash for {}:\n {} expected\n {} actual.'.format(what, expected, dhash)) - def download(self, what: str, ofname: str, fallback=False) -> None: + def download(self, what: str, ofname: str, fallback: bool = False) -> None: self.check_can_download() srcurl = self.wrap.get(what + ('_fallback_url' if fallback else '_url')) mlog.log('Downloading', mlog.bold(self.packagename), what, 'from', mlog.bold(srcurl)) diff --git a/mesonbuild/wrap/wraptool.py b/mesonbuild/wrap/wraptool.py index e01e15985..d4208896e 100644 --- a/mesonbuild/wrap/wraptool.py +++ b/mesonbuild/wrap/wraptool.py @@ -16,6 +16,7 @@ import json import sys, os import configparser import shutil +import typing as T from glob import glob @@ -23,7 +24,10 @@ from .wrap import API_ROOT, open_wrapdburl from .. import mesonlib -def add_arguments(parser): +if T.TYPE_CHECKING: + import argparse + +def add_arguments(parser: 'argparse.ArgumentParser') -> None: subparsers = parser.add_subparsers(title='Commands', dest='command') subparsers.required = True @@ -53,26 +57,28 @@ def add_arguments(parser): p.add_argument('project_path') p.set_defaults(wrap_func=promote) -def get_result(urlstring): +def get_result(urlstring: str) -> T.Dict[str, T.Any]: u = open_wrapdburl(urlstring) data = u.read().decode('utf-8') jd = json.loads(data) if jd['output'] != 'ok': print('Got bad output from server.', file=sys.stderr) raise SystemExit(data) + assert isinstance(jd, dict) return jd -def get_projectlist(): +def get_projectlist() -> T.List[str]: jd = get_result(API_ROOT + 'projects') projects = jd['projects'] + assert isinstance(projects, list) return projects -def list_projects(options): +def list_projects(options: 'argparse.Namespace') -> None: projects = get_projectlist() for p in projects: print(p) -def search(options): +def search(options: 'argparse.Namespace') -> None: name = options.name jd = get_result(API_ROOT + 'query/byname/' + name) for p in jd['projects']: @@ -84,7 +90,7 @@ def get_latest_version(name: str) -> tuple: revision = jd['revision'] return branch, revision -def install(options): +def install(options: 'argparse.Namespace') -> None: name = options.name if not os.path.isdir('subprojects'): raise SystemExit('Subprojects dir not found. Run this script in your source root directory.') @@ -100,25 +106,25 @@ def install(options): f.write(data) print('Installed', name, 'branch', branch, 'revision', revision) -def parse_patch_url(patch_url): +def parse_patch_url(patch_url: str) -> T.Tuple[str, int]: arr = patch_url.split('/') return arr[-3], int(arr[-2]) -def get_current_version(wrapfile): +def get_current_version(wrapfile: str) -> T.Tuple[str, int, str, str, str]: cp = configparser.ConfigParser(interpolation=None) cp.read(wrapfile) - cp = cp['wrap-file'] - patch_url = cp['patch_url'] + wrap_data = cp['wrap-file'] + patch_url = wrap_data['patch_url'] branch, revision = parse_patch_url(patch_url) - return branch, revision, cp['directory'], cp['source_filename'], cp['patch_filename'] + return branch, revision, wrap_data['directory'], wrap_data['source_filename'], wrap_data['patch_filename'] -def update_wrap_file(wrapfile, name, new_branch, new_revision): +def update_wrap_file(wrapfile: str, name: str, new_branch: str, new_revision: str) -> None: u = open_wrapdburl(API_ROOT + 'projects/{}/{}/{}/get_wrap'.format(name, new_branch, new_revision)) data = u.read() with open(wrapfile, 'wb') as f: f.write(data) -def update(options): +def update(options: 'argparse.Namespace') -> None: name = options.name if not os.path.isdir('subprojects'): raise SystemExit('Subprojects dir not found. Run this command in your source root directory.') @@ -142,7 +148,7 @@ def update(options): pass print('Updated', name, 'to branch', new_branch, 'revision', new_revision) -def info(options): +def info(options: 'argparse.Namespace') -> None: name = options.name jd = get_result(API_ROOT + 'projects/' + name) versions = jd['versions'] @@ -152,7 +158,7 @@ def info(options): for v in versions: print(' ', v['branch'], v['revision']) -def do_promotion(from_path, spdir_name): +def do_promotion(from_path: str, spdir_name: str) -> None: if os.path.isfile(from_path): assert(from_path.endswith('.wrap')) shutil.copy(from_path, spdir_name) @@ -163,7 +169,7 @@ def do_promotion(from_path, spdir_name): raise SystemExit('Output dir {} already exists. Will not overwrite.'.format(outputdir)) shutil.copytree(from_path, outputdir, ignore=shutil.ignore_patterns('subprojects')) -def promote(options): +def promote(options: 'argparse.Namespace') -> None: argument = options.project_path spdir_name = 'subprojects' sprojs = mesonlib.detect_subprojects(spdir_name) @@ -186,7 +192,7 @@ def promote(options): raise SystemExit(1) do_promotion(matches[0], spdir_name) -def status(options): +def status(options: 'argparse.Namespace') -> None: print('Subproject status') for w in glob('subprojects/*.wrap'): name = os.path.basename(w)[:-5] @@ -205,6 +211,6 @@ def status(options): else: print('', name, 'not up to date. Have {} {}, but {} {} is available.'.format(current_branch, current_revision, latest_branch, latest_revision)) -def run(options): +def run(options: 'argparse.Namespace') -> int: options.wrap_func(options) return 0 diff --git a/run_mypy.py b/run_mypy.py index 4e5616854..aca42da13 100755 --- a/run_mypy.py +++ b/run_mypy.py @@ -34,7 +34,7 @@ strict_modules = [ 'mesonbuild/mesonlib.py', 'mesonbuild/mlog.py', 'mesonbuild/ast', - # 'mesonbuild/wrap', + 'mesonbuild/wrap', 'run_mypy.py', ]