@ -45,9 +45,7 @@ except ImportError:
req_timeout = 600.0
req_timeout = 600.0
ssl_warning_printed = False
ssl_warning_printed = False
whitelist_domain = ' https://wrapdb.mesonbuild.com/ '
whitelist_subdomain = ' wrapdb.mesonbuild.com '
whitelist_domain_nossl = ' http://wrapdb.mesonbuild.com/ '
masquerade_str = ' wrapdb.mesonbuild.com '
def quiet_git ( cmd : typing . List [ str ] , workingdir : str ) - > typing . Tuple [ bool , str ] :
def quiet_git ( cmd : typing . List [ str ] , workingdir : str ) - > typing . Tuple [ bool , str ] :
@ -60,26 +58,34 @@ def quiet_git(cmd: typing.List[str], workingdir: str) -> typing.Tuple[bool, str]
return False , pc . stderr
return False , pc . stderr
return True , pc . stdout
return True , pc . stdout
def whitelist_wrapdb ( urlstr : str ) - > urllib . parse . ParseResult :
""" raises WrapException if not whitelisted subdomain """
url = urllib . parse . urlparse ( urlstr )
if not url . hostname :
raise WrapException ( ' {} is not a valid URL ' . format ( urlstr ) )
if not url . hostname . endswith ( whitelist_subdomain ) :
raise WrapException ( ' {} is not a whitelisted WrapDB URL ' . format ( urlstr ) )
if has_ssl and not url . scheme == ' https ' :
raise WrapException ( ' WrapDB did not have expected SSL https url, instead got {} ' . format ( urlstr ) )
return url
def open_wrapdburl ( urlstring : str ) - > ' http.client.HTTPResponse ' :
def open_wrapdburl ( urlstring : str ) - > ' http.client.HTTPResponse ' :
global ssl_warning_printed
global ssl_warning_printed
url = whitelist_wrapdb ( urlstring )
if has_ssl :
if has_ssl :
if not urlstring . startswith ( whitelist_domain ) :
raise WrapException ( ' {} is not a whitelisted URL ' . format ( urlstring ) )
try :
try :
return urllib . request . urlopen ( urlstring , timeout = req_timeout )
return urllib . request . urlopen ( urllib . parse . urlunparse ( url ) , timeout = req_timeout )
except urllib . error . URLError as excp :
except urllib . error . URLError as excp :
raise WrapException ( ' WrapDB connection failed to {} with error {} ' . format ( urlstring , excp ) )
raise WrapException ( ' WrapDB connection failed to {} with error {} ' . format ( urlstring , excp ) )
# following code is only for those without Python SSL
# following code is only for those without Python SSL
nossl_urlstring = urlstring . replace ( ' https:// ' , ' http:// ' )
nossl_url = url . _replace ( scheme = ' http ' )
if not nossl_urlstring . startswith ( whitelist_domain_nossl ) :
raise WrapException ( ' {} is not a whitelisted URL ' . format ( nossl_urlstring ) )
if not ssl_warning_printed :
if not ssl_warning_printed :
mlog . warning ( ' SSL module not available in {} : WrapDB traffic not authenticated. ' . format ( sys . executable ) )
mlog . warning ( ' SSL module not available in {} : WrapDB traffic not authenticated. ' . format ( sys . executable ) )
ssl_warning_printed = True
ssl_warning_printed = True
try :
try :
return urllib . request . urlopen ( nossl_urlstring , timeout = req_timeout )
return urllib . request . urlopen ( urllib . parse . urlunparse ( nossl_url ) , timeout = req_timeout )
except urllib . error . URLError as excp :
except urllib . error . URLError as excp :
raise WrapException ( ' WrapDB connection failed to {} with error {} ' . format ( urlstring , excp ) )
raise WrapException ( ' WrapDB connection failed to {} with error {} ' . format ( urlstring , excp ) )
@ -320,20 +326,22 @@ class Resolver:
subprocess . check_call ( [ svn , ' checkout ' , ' -r ' , revno , self . wrap . get ( ' url ' ) ,
subprocess . check_call ( [ svn , ' checkout ' , ' -r ' , revno , self . wrap . get ( ' url ' ) ,
self . directory ] , cwd = self . subdir_root )
self . directory ] , cwd = self . subdir_root )
def get_data ( self , url : str ) - > typing . Tuple [ str , str ] :
def get_data ( self , urlstring : str ) - > typing . Tuple [ str , str ] :
blocksize = 10 * 1024
blocksize = 10 * 1024
h = hashlib . sha256 ( )
h = hashlib . sha256 ( )
tmpfile = tempfile . NamedTemporaryFile ( mode = ' wb ' , dir = self . cachedir , delete = False )
tmpfile = tempfile . NamedTemporaryFile ( mode = ' wb ' , dir = self . cachedir , delete = False )
hostname = urllib . parse . urlparse ( url ) . hostname
url = urllib . parse . urlparse ( urlstring )
if hostname == ' wrapdb.mesonbuild.com ' or hostname . endswith ( ' .wrapdb.mesonbuild.com ' ) :
if not url . hostname :
resp = open_wrapdburl ( url )
raise WrapException ( ' {} is not a valid URL ' . format ( urlstring ) )
elif masquerade_str in url :
if url . hostname . endswith ( whitelist_subdomain ) :
raise WrapException ( ' {} may be a WrapDB-impersonating URL ' . format ( url ) )
resp = open_wrapdburl ( urlstring )
elif whitelist_subdomain in urlstring :
raise WrapException ( ' {} may be a WrapDB-impersonating URL ' . format ( urlstring ) )
else :
else :
try :
try :
resp = urllib . request . urlopen ( url , timeout = req_timeout )
resp = urllib . request . urlopen ( urlstring , timeout = req_timeout )
except urllib . error . URLError :
except urllib . error . URLError :
raise WrapException ( ' could not get {} is the internet available? ' . format ( url ) )
raise WrapException ( ' could not get {} is the internet available? ' . format ( urlstring ) )
with contextlib . closing ( resp ) as resp :
with contextlib . closing ( resp ) as resp :
try :
try :
dlsize = int ( resp . info ( ) [ ' Content-Length ' ] )
dlsize = int ( resp . info ( ) [ ' Content-Length ' ] )