wrap.py: apply type annotation, modernize syntax

correct syntax issues, missing imports revealed by type annotation checking
pull/6148/head
Michael Hirsch, Ph.D 5 years ago committed by Jussi Pakkanen
parent 30acd94e68
commit a47c1374b9
  1. 4
      mesonbuild/wrap/__init__.py
  2. 73
      mesonbuild/wrap/wrap.py
  3. 41
      mesonbuild/wrap/wraptool.py

@ -48,10 +48,10 @@ class WrapMode(Enum):
nodownload = 3
forcefallback = 4
def __str__(self):
def __str__(self) -> str:
return self.name
@staticmethod
def from_string(mode_name):
def from_string(mode_name: str):
g = string_to_value[mode_name]
return WrapMode(g)

@ -14,14 +14,24 @@
from .. import mlog
import contextlib
import urllib.request, os, hashlib, shutil, tempfile, stat
import urllib.request
import urllib.error
import os
import hashlib
import shutil
import tempfile
import stat
import subprocess
import sys
import configparser
import typing
from . import WrapMode
from ..mesonlib import ProgressBar, MesonException
if typing.TYPE_CHECKING:
import http.client
try:
import ssl
has_ssl = True
@ -33,7 +43,7 @@ except ImportError:
req_timeout = 600.0
ssl_warning_printed = False
def build_ssl_context():
def build_ssl_context() -> 'ssl.SSLContext':
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
@ -41,18 +51,17 @@ def build_ssl_context():
ctx.load_default_certs()
return ctx
def quiet_git(cmd, workingdir):
def quiet_git(cmd: typing.List[str], workingdir: str) -> typing.Tuple[bool, str]:
try:
pc = subprocess.Popen(['git', '-C', workingdir] + cmd, stdin=subprocess.DEVNULL,
pc = subprocess.run(['git', '-C', workingdir] + cmd, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except FileNotFoundError as e:
return False, str(e)
out, err = pc.communicate()
if pc.returncode != 0:
return False, err
return True, out
return False, pc.stderr
return True, pc.stdout
def open_wrapdburl(urlstring):
def open_wrapdburl(urlstring: str) -> 'http.client.HTTPResponse':
global ssl_warning_printed
if has_ssl:
try:
@ -78,7 +87,7 @@ class WrapNotFoundException(WrapException):
pass
class PackageDefinition:
def __init__(self, fname):
def __init__(self, fname: str):
self.filename = fname
self.basename = os.path.basename(fname)
self.name = self.basename[:-5]
@ -96,23 +105,23 @@ class PackageDefinition:
self.type = self.wrap_section[5:]
self.values = dict(self.config[self.wrap_section])
def get(self, key):
def get(self, key: str) -> str:
try:
return self.values[key]
except KeyError:
m = 'Missing key {!r} in {}'
raise WrapException(m.format(key, self.basename))
def has_patch(self):
def has_patch(self) -> bool:
return 'patch_url' in self.values
class Resolver:
def __init__(self, subdir_root, wrap_mode=WrapMode.default):
def __init__(self, subdir_root: str, wrap_mode=WrapMode.default):
self.wrap_mode = wrap_mode
self.subdir_root = subdir_root
self.cachedir = os.path.join(self.subdir_root, 'packagecache')
def resolve(self, packagename: str, method: str):
def resolve(self, packagename: str, method: str) -> str:
self.packagename = packagename
self.directory = packagename
# We always have to load the wrap file, if it exists, because it could
@ -168,20 +177,20 @@ class Resolver:
return self.directory
def load_wrap(self):
def load_wrap(self) -> PackageDefinition:
fname = os.path.join(self.subdir_root, self.packagename + '.wrap')
if os.path.isfile(fname):
return PackageDefinition(fname)
return None
def check_can_download(self):
def check_can_download(self) -> None:
# Don't download subproject data based on wrap file if requested.
# Git submodules are ok (see above)!
if self.wrap_mode is WrapMode.nodownload:
m = 'Automatic wrap-based subproject downloading is disabled'
raise WrapException(m)
def resolve_git_submodule(self):
def resolve_git_submodule(self) -> bool:
# Are we in a git repository?
ret, out = quiet_git(['rev-parse'], self.subdir_root)
if not ret:
@ -191,29 +200,29 @@ class Resolver:
if not ret:
return False
# Submodule has not been added, add it
if out.startswith(b'+'):
if out.startswith('+'):
mlog.warning('git submodule might be out of date')
return True
elif out.startswith(b'U'):
elif out.startswith('U'):
raise WrapException('git submodule has merge conflicts')
# Submodule exists, but is deinitialized or wasn't initialized
elif out.startswith(b'-'):
elif out.startswith('-'):
if subprocess.call(['git', '-C', self.subdir_root, 'submodule', 'update', '--init', self.dirname]) == 0:
return True
raise WrapException('git submodule failed to init')
# Submodule looks fine, but maybe it wasn't populated properly. Do a checkout.
elif out.startswith(b' '):
elif out.startswith(' '):
subprocess.call(['git', 'checkout', '.'], cwd=self.dirname)
# Even if checkout failed, try building it anyway and let the user
# handle any problems manually.
return True
elif out == b'':
elif out == '':
# It is not a submodule, just a folder that exists in the main repository.
return False
m = 'Unknown git submodule output: {!r}'
raise WrapException(m.format(out))
def get_file(self):
def get_file(self) -> None:
path = self.get_file_internal('source')
extract_dir = self.subdir_root
# Some upstreams ship packages that do not have a leading directory.
@ -225,7 +234,7 @@ class Resolver:
if self.wrap.has_patch():
self.apply_patch()
def get_git(self):
def get_git(self) -> None:
revno = self.wrap.get('revision')
is_shallow = self.wrap.values.get('depth', '') != ''
# for some reason git only allows commit ids to be shallowly fetched by fetch not with clone
@ -271,13 +280,13 @@ class Resolver:
'--push', 'origin', push_url],
cwd=self.dirname)
def is_git_full_commit_id(self, revno):
def is_git_full_commit_id(self, revno: str) -> bool:
result = False
if len(revno) in (40, 64): # 40 for sha1, 64 for upcoming sha256
result = all((ch in '0123456789AaBbCcDdEeFf' for ch in revno))
return result
def get_hg(self):
def get_hg(self) -> None:
revno = self.wrap.get('revision')
subprocess.check_call(['hg', 'clone', self.wrap.get('url'),
self.directory], cwd=self.subdir_root)
@ -285,12 +294,12 @@ class Resolver:
subprocess.check_call(['hg', 'checkout', revno],
cwd=self.dirname)
def get_svn(self):
def get_svn(self) -> None:
revno = self.wrap.get('revision')
subprocess.check_call(['svn', 'checkout', '-r', revno, self.wrap.get('url'),
self.directory], cwd=self.subdir_root)
def get_data(self, url):
def get_data(self, url: str) -> typing.Tuple[str, str]:
blocksize = 10 * 1024
h = hashlib.sha256()
tmpfile = tempfile.NamedTemporaryFile(mode='wb', dir=self.cachedir, delete=False)
@ -327,7 +336,7 @@ class Resolver:
hashvalue = h.hexdigest()
return hashvalue, tmpfile.name
def check_hash(self, what, path):
def check_hash(self, what: str, path: str) -> None:
expected = self.wrap.get(what + '_hash')
h = hashlib.sha256()
with open(path, 'rb') as f:
@ -336,7 +345,7 @@ class Resolver:
if dhash != expected:
raise WrapException('Incorrect hash for %s:\n %s expected\n %s actual.' % (what, expected, dhash))
def download(self, what, ofname):
def download(self, what: str, ofname: str) -> None:
self.check_can_download()
srcurl = self.wrap.get(what + '_url')
mlog.log('Downloading', mlog.bold(self.packagename), what, 'from', mlog.bold(srcurl))
@ -347,7 +356,7 @@ class Resolver:
raise WrapException('Incorrect hash for %s:\n %s expected\n %s actual.' % (what, expected, dhash))
os.rename(tmpfile, ofname)
def get_file_internal(self, what):
def get_file_internal(self, what: str) -> str:
filename = self.wrap.get(what + '_filename')
cache_path = os.path.join(self.cachedir, filename)
@ -361,7 +370,7 @@ class Resolver:
self.download(what, cache_path)
return cache_path
def apply_patch(self):
def apply_patch(self) -> None:
path = self.get_file_internal('patch')
try:
shutil.unpack_archive(path, self.subdir_root)
@ -370,7 +379,7 @@ class Resolver:
shutil.unpack_archive(path, workdir)
self.copy_tree(workdir, self.subdir_root)
def copy_tree(self, root_src_dir, root_dst_dir):
def copy_tree(self, root_src_dir: str, root_dst_dir: str) -> None:
"""
Copy directory tree. Overwrites also read only files.
"""

@ -58,9 +58,8 @@ def get_result(urlstring):
data = u.read().decode('utf-8')
jd = json.loads(data)
if jd['output'] != 'ok':
print('Got bad output from server.')
print(data)
sys.exit(1)
print('Got bad output from server.', file=sys.stderr)
raise SystemExit(data)
return jd
def get_projectlist():
@ -79,7 +78,7 @@ def search(options):
for p in jd['projects']:
print(p)
def get_latest_version(name):
def get_latest_version(name: str) -> tuple:
jd = get_result(API_ROOT + 'query/get_latest/' + name)
branch = jd['branch']
revision = jd['revision']
@ -88,15 +87,12 @@ def get_latest_version(name):
def install(options):
name = options.name
if not os.path.isdir('subprojects'):
print('Subprojects dir not found. Run this script in your source root directory.')
sys.exit(1)
raise SystemExit('Subprojects dir not found. Run this script in your source root directory.')
if os.path.isdir(os.path.join('subprojects', name)):
print('Subproject directory for this project already exists.')
sys.exit(1)
raise SystemExit('Subproject directory for this project already exists.')
wrapfile = os.path.join('subprojects', name + '.wrap')
if os.path.exists(wrapfile):
print('Wrap file already exists.')
sys.exit(1)
raise SystemExit('Wrap file already exists.')
(branch, revision) = get_latest_version(name)
u = open_wrapdburl(API_ROOT + 'projects/%s/%s/%s/get_wrap' % (name, branch, revision))
data = u.read()
@ -125,17 +121,15 @@ def update_wrap_file(wrapfile, name, new_branch, new_revision):
def update(options):
name = options.name
if not os.path.isdir('subprojects'):
print('Subprojects dir not found. Run this command in your source root directory.')
sys.exit(1)
raise SystemExit('Subprojects dir not found. Run this command in your source root directory.')
wrapfile = os.path.join('subprojects', name + '.wrap')
if not os.path.exists(wrapfile):
print('Project', name, 'is not in use.')
sys.exit(1)
raise SystemExit('Project', name, 'is not in use.')
(branch, revision, subdir, src_file, patch_file) = get_current_version(wrapfile)
(new_branch, new_revision) = get_latest_version(name)
if new_branch == branch and new_revision == revision:
print('Project', name, 'is already up to date.')
sys.exit(0)
raise SystemExit
update_wrap_file(wrapfile, name, new_branch, new_revision)
shutil.rmtree(os.path.join('subprojects', subdir), ignore_errors=True)
try:
@ -153,8 +147,7 @@ def info(options):
jd = get_result(API_ROOT + 'projects/' + name)
versions = jd['versions']
if not versions:
print('No available versions of', name)
sys.exit(0)
raise SystemExit('No available versions of' + name)
print('Available versions of %s:' % name)
for v in versions:
print(' ', v['branch'], v['revision'])
@ -167,7 +160,7 @@ def do_promotion(from_path, spdir_name):
sproj_name = os.path.basename(from_path)
outputdir = os.path.join(spdir_name, sproj_name)
if os.path.exists(outputdir):
sys.exit('Output dir %s already exists. Will not overwrite.' % outputdir)
raise SystemExit('Output dir %s already exists. Will not overwrite.' % outputdir)
shutil.copytree(from_path, outputdir, ignore=shutil.ignore_patterns('subprojects'))
def promote(options):
@ -184,13 +177,13 @@ def promote(options):
# otherwise the argument is just a subproject basename which must be unambiguous
if argument not in sprojs:
sys.exit('Subproject %s not found in directory tree.' % argument)
raise SystemExit('Subproject %s not found in directory tree.' % argument)
matches = sprojs[argument]
if len(matches) > 1:
print('There is more than one version of %s in tree. Please specify which one to promote:\n' % argument)
print('There is more than one version of %s in tree. Please specify which one to promote:\n' % argument, file=sys.stderr)
for s in matches:
print(s)
sys.exit(1)
print(s, file=sys.stderr)
raise SystemExit(1)
do_promotion(matches[0], spdir_name)
def status(options):
@ -200,12 +193,12 @@ def status(options):
try:
(latest_branch, latest_revision) = get_latest_version(name)
except Exception:
print('', name, 'not available in wrapdb.')
print('', name, 'not available in wrapdb.', file=sys.stderr)
continue
try:
(current_branch, current_revision, _, _, _) = get_current_version(w)
except Exception:
print('Wrap file not from wrapdb.')
print('Wrap file not from wrapdb.', file=sys.stderr)
continue
if current_branch == latest_branch and current_revision == latest_revision:
print('', name, 'up to date. Branch %s, revision %d.' % (current_branch, current_revision))

Loading…
Cancel
Save