typing: fully annotate wrap

pull/7657/head
Daniel Mensinger 4 years ago
parent fb9738b8c7
commit 449dd8e72a
No known key found for this signature in database
GPG Key ID: 54DD94C131E277D4
  1. 2
      mesonbuild/wrap/__init__.py
  2. 30
      mesonbuild/wrap/wrap.py
  3. 42
      mesonbuild/wrap/wraptool.py
  4. 2
      run_mypy.py

@ -52,6 +52,6 @@ class WrapMode(Enum):
return self.name
@staticmethod
def from_string(mode_name: str):
def from_string(mode_name: str) -> 'WrapMode':
g = string_to_value[mode_name]
return WrapMode(g)

@ -83,7 +83,7 @@ def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse':
url = whitelist_wrapdb(urlstring)
if has_ssl:
try:
return urllib.request.urlopen(urllib.parse.urlunparse(url), timeout=REQ_TIMEOUT)
return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(url), timeout=REQ_TIMEOUT))
except urllib.error.URLError as excp:
raise WrapException('WrapDB connection failed to {} with error {}'.format(urlstring, excp))
@ -93,7 +93,7 @@ def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse':
mlog.warning('SSL module not available in {}: WrapDB traffic not authenticated.'.format(sys.executable))
SSL_WARNING_PRINTED = True
try:
return urllib.request.urlopen(urllib.parse.urlunparse(nossl_url), timeout=REQ_TIMEOUT)
return T.cast('http.client.HTTPResponse', urllib.request.urlopen(urllib.parse.urlunparse(nossl_url), timeout=REQ_TIMEOUT))
except urllib.error.URLError as excp:
raise WrapException('WrapDB connection failed to {} with error {}'.format(urlstring, excp))
@ -107,7 +107,7 @@ class WrapNotFoundException(WrapException):
class PackageDefinition:
def __init__(self, fname: str):
self.filename = fname
self.type = None
self.type = None # type: str
self.values = {} # type: T.Dict[str, str]
self.provided_deps = {} # type: T.Dict[str, T.Optional[str]]
self.provided_programs = [] # type: T.List[str]
@ -122,7 +122,7 @@ class PackageDefinition:
if os.path.dirname(self.directory):
raise WrapException('Directory key must be a name and not a path')
def parse_wrap(self, fname: str):
def parse_wrap(self, fname: str) -> None:
try:
self.config = configparser.ConfigParser(interpolation=None)
self.config.read(fname)
@ -131,7 +131,7 @@ class PackageDefinition:
self.parse_wrap_section()
self.parse_provide_section()
def parse_wrap_section(self):
def parse_wrap_section(self) -> None:
if len(self.config.sections()) < 1:
raise WrapException('Missing sections in {}'.format(self.basename))
self.wrap_section = self.config.sections()[0]
@ -141,19 +141,19 @@ class PackageDefinition:
self.type = self.wrap_section[5:]
self.values = dict(self.config[self.wrap_section])
def parse_provide_section(self):
def parse_provide_section(self) -> None:
if self.config.has_section('provide'):
for k, v in self.config['provide'].items():
if k == 'dependency_names':
# A comma separated list of dependency names that does not
# need a variable name
names = {n.strip(): None for n in v.split(',')}
self.provided_deps.update(names)
names_dict = {n.strip(): None for n in v.split(',')}
self.provided_deps.update(names_dict)
continue
if k == 'program_names':
# A comma separated list of program names
names = [n.strip() for n in v.split(',')]
self.provided_programs += names
names_list = [n.strip() for n in v.split(',')]
self.provided_programs += names_list
continue
if not v:
m = ('Empty dependency variable name for {!r} in {}. '
@ -177,7 +177,7 @@ def get_directory(subdir_root: str, packagename: str) -> str:
return packagename
class Resolver:
def __init__(self, subdir_root: str, wrap_mode=WrapMode.default):
def __init__(self, subdir_root: str, wrap_mode: WrapMode = WrapMode.default) -> None:
self.wrap_mode = wrap_mode
self.subdir_root = subdir_root
self.cachedir = os.path.join(self.subdir_root, 'packagecache')
@ -187,7 +187,7 @@ class Resolver:
self.provided_programs = {} # type: T.Dict[str, PackageDefinition]
self.load_wraps()
def load_wraps(self):
def load_wraps(self) -> None:
if not os.path.isdir(self.subdir_root):
return
root, dirs, files = next(os.walk(self.subdir_root))
@ -221,7 +221,7 @@ class Resolver:
raise WrapException(m.format(k, wrap.basename, prev_wrap.basename))
self.provided_programs[k] = wrap
def find_dep_provider(self, packagename: str):
def find_dep_provider(self, packagename: str) -> T.Optional[T.Union[str, T.List[str]]]:
# Return value is in the same format as fallback kwarg:
# ['subproject_name', 'variable_name'], or 'subproject_name'.
wrap = self.provided_deps.get(packagename)
@ -232,7 +232,7 @@ class Resolver:
return wrap.name
return None
def find_program_provider(self, names: T.List[str]):
def find_program_provider(self, names: T.List[str]) -> T.Optional[str]:
for name in names:
wrap = self.provided_programs.get(name)
if wrap:
@ -464,7 +464,7 @@ class Resolver:
if dhash != expected:
raise WrapException('Incorrect hash for {}:\n {} expected\n {} actual.'.format(what, expected, dhash))
def download(self, what: str, ofname: str, fallback=False) -> None:
def download(self, what: str, ofname: str, fallback: bool = False) -> None:
self.check_can_download()
srcurl = self.wrap.get(what + ('_fallback_url' if fallback else '_url'))
mlog.log('Downloading', mlog.bold(self.packagename), what, 'from', mlog.bold(srcurl))

@ -16,6 +16,7 @@ import json
import sys, os
import configparser
import shutil
import typing as T
from glob import glob
@ -23,7 +24,10 @@ from .wrap import API_ROOT, open_wrapdburl
from .. import mesonlib
def add_arguments(parser):
if T.TYPE_CHECKING:
import argparse
def add_arguments(parser: 'argparse.ArgumentParser') -> None:
subparsers = parser.add_subparsers(title='Commands', dest='command')
subparsers.required = True
@ -53,26 +57,28 @@ def add_arguments(parser):
p.add_argument('project_path')
p.set_defaults(wrap_func=promote)
def get_result(urlstring):
def get_result(urlstring: str) -> T.Dict[str, T.Any]:
u = open_wrapdburl(urlstring)
data = u.read().decode('utf-8')
jd = json.loads(data)
if jd['output'] != 'ok':
print('Got bad output from server.', file=sys.stderr)
raise SystemExit(data)
assert isinstance(jd, dict)
return jd
def get_projectlist():
def get_projectlist() -> T.List[str]:
jd = get_result(API_ROOT + 'projects')
projects = jd['projects']
assert isinstance(projects, list)
return projects
def list_projects(options):
def list_projects(options: 'argparse.Namespace') -> None:
projects = get_projectlist()
for p in projects:
print(p)
def search(options):
def search(options: 'argparse.Namespace') -> None:
name = options.name
jd = get_result(API_ROOT + 'query/byname/' + name)
for p in jd['projects']:
@ -84,7 +90,7 @@ def get_latest_version(name: str) -> tuple:
revision = jd['revision']
return branch, revision
def install(options):
def install(options: 'argparse.Namespace') -> None:
name = options.name
if not os.path.isdir('subprojects'):
raise SystemExit('Subprojects dir not found. Run this script in your source root directory.')
@ -100,25 +106,25 @@ def install(options):
f.write(data)
print('Installed', name, 'branch', branch, 'revision', revision)
def parse_patch_url(patch_url):
def parse_patch_url(patch_url: str) -> T.Tuple[str, int]:
arr = patch_url.split('/')
return arr[-3], int(arr[-2])
def get_current_version(wrapfile):
def get_current_version(wrapfile: str) -> T.Tuple[str, int, str, str, str]:
cp = configparser.ConfigParser(interpolation=None)
cp.read(wrapfile)
cp = cp['wrap-file']
patch_url = cp['patch_url']
wrap_data = cp['wrap-file']
patch_url = wrap_data['patch_url']
branch, revision = parse_patch_url(patch_url)
return branch, revision, cp['directory'], cp['source_filename'], cp['patch_filename']
return branch, revision, wrap_data['directory'], wrap_data['source_filename'], wrap_data['patch_filename']
def update_wrap_file(wrapfile, name, new_branch, new_revision):
def update_wrap_file(wrapfile: str, name: str, new_branch: str, new_revision: str) -> None:
u = open_wrapdburl(API_ROOT + 'projects/{}/{}/{}/get_wrap'.format(name, new_branch, new_revision))
data = u.read()
with open(wrapfile, 'wb') as f:
f.write(data)
def update(options):
def update(options: 'argparse.Namespace') -> None:
name = options.name
if not os.path.isdir('subprojects'):
raise SystemExit('Subprojects dir not found. Run this command in your source root directory.')
@ -142,7 +148,7 @@ def update(options):
pass
print('Updated', name, 'to branch', new_branch, 'revision', new_revision)
def info(options):
def info(options: 'argparse.Namespace') -> None:
name = options.name
jd = get_result(API_ROOT + 'projects/' + name)
versions = jd['versions']
@ -152,7 +158,7 @@ def info(options):
for v in versions:
print(' ', v['branch'], v['revision'])
def do_promotion(from_path, spdir_name):
def do_promotion(from_path: str, spdir_name: str) -> None:
if os.path.isfile(from_path):
assert(from_path.endswith('.wrap'))
shutil.copy(from_path, spdir_name)
@ -163,7 +169,7 @@ def do_promotion(from_path, spdir_name):
raise SystemExit('Output dir {} already exists. Will not overwrite.'.format(outputdir))
shutil.copytree(from_path, outputdir, ignore=shutil.ignore_patterns('subprojects'))
def promote(options):
def promote(options: 'argparse.Namespace') -> None:
argument = options.project_path
spdir_name = 'subprojects'
sprojs = mesonlib.detect_subprojects(spdir_name)
@ -186,7 +192,7 @@ def promote(options):
raise SystemExit(1)
do_promotion(matches[0], spdir_name)
def status(options):
def status(options: 'argparse.Namespace') -> None:
print('Subproject status')
for w in glob('subprojects/*.wrap'):
name = os.path.basename(w)[:-5]
@ -205,6 +211,6 @@ def status(options):
else:
print('', name, 'not up to date. Have {} {}, but {} {} is available.'.format(current_branch, current_revision, latest_branch, latest_revision))
def run(options):
def run(options: 'argparse.Namespace') -> int:
options.wrap_func(options)
return 0

@ -34,7 +34,7 @@ strict_modules = [
'mesonbuild/mesonlib.py',
'mesonbuild/mlog.py',
'mesonbuild/ast',
# 'mesonbuild/wrap',
'mesonbuild/wrap',
'run_mypy.py',
]

Loading…
Cancel
Save