|
|
|
@ -1,13 +1,13 @@ |
|
|
|
|
# Ultralytics YOLO 🚀, AGPL-3.0 license |
|
|
|
|
|
|
|
|
|
import contextlib |
|
|
|
|
import re |
|
|
|
|
import shutil |
|
|
|
|
import subprocess |
|
|
|
|
from itertools import repeat |
|
|
|
|
from multiprocessing.pool import ThreadPool |
|
|
|
|
from pathlib import Path |
|
|
|
|
from urllib import parse, request |
|
|
|
|
from zipfile import BadZipFile, ZipFile, is_zipfile |
|
|
|
|
|
|
|
|
|
import requests |
|
|
|
|
import torch |
|
|
|
@ -39,7 +39,45 @@ def is_url(url, check=True): |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def unzip_file(file, path=None, exclude=('.DS_Store', '__MACOSX'), exist_ok=False): |
|
|
|
|
def zip_directory(directory, compress=True, exclude=('.DS_Store', '__MACOSX'), progress=True): |
|
|
|
|
""" |
|
|
|
|
Zips the contents of a directory, excluding files containing strings in the exclude list. |
|
|
|
|
The resulting zip file is named after the directory and placed alongside it. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
directory (str | Path): The path to the directory to be zipped. |
|
|
|
|
compress (bool): Whether to compress the files while zipping. Default is True. |
|
|
|
|
exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX'). |
|
|
|
|
progress (bool, optional): Whether to display a progress bar. Defaults to True. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
(Path): The path to the resulting zip file. |
|
|
|
|
|
|
|
|
|
Example: |
|
|
|
|
```python |
|
|
|
|
from ultralytics.utils.downloads import zip_directory |
|
|
|
|
|
|
|
|
|
file = zip_directory('path/to/dir') |
|
|
|
|
``` |
|
|
|
|
""" |
|
|
|
|
from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile |
|
|
|
|
|
|
|
|
|
directory = Path(directory) |
|
|
|
|
if not directory.is_dir(): |
|
|
|
|
raise FileNotFoundError(f"Directory '{directory}' does not exist.") |
|
|
|
|
|
|
|
|
|
# Unzip with progress bar |
|
|
|
|
files_to_zip = [f for f in directory.rglob('*') if f.is_file() and not any(x in f.name for x in exclude)] |
|
|
|
|
zip_file = directory.with_suffix('.zip') |
|
|
|
|
compression = ZIP_DEFLATED if compress else ZIP_STORED |
|
|
|
|
with ZipFile(zip_file, 'w', compression) as f: |
|
|
|
|
for file in tqdm(files_to_zip, desc=f'Zipping {directory} to {zip_file}...', unit='file', disable=not progress): |
|
|
|
|
f.write(file, file.relative_to(directory)) |
|
|
|
|
|
|
|
|
|
return zip_file # return path to zip file |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def unzip_file(file, path=None, exclude=('.DS_Store', '__MACOSX'), exist_ok=False, progress=True): |
|
|
|
|
""" |
|
|
|
|
Unzips a *.zip file to the specified path, excluding files containing strings in the exclude list. |
|
|
|
|
|
|
|
|
@ -52,13 +90,23 @@ def unzip_file(file, path=None, exclude=('.DS_Store', '__MACOSX'), exist_ok=Fals |
|
|
|
|
path (str, optional): The path to extract the zipfile to. Defaults to None. |
|
|
|
|
exclude (tuple, optional): A tuple of filename strings to be excluded. Defaults to ('.DS_Store', '__MACOSX'). |
|
|
|
|
exist_ok (bool, optional): Whether to overwrite existing contents if they exist. Defaults to False. |
|
|
|
|
progress (bool, optional): Whether to display a progress bar. Defaults to True. |
|
|
|
|
|
|
|
|
|
Raises: |
|
|
|
|
BadZipFile: If the provided file does not exist or is not a valid zipfile. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
(Path): The path to the directory where the zipfile was extracted. |
|
|
|
|
|
|
|
|
|
Example: |
|
|
|
|
```python |
|
|
|
|
from ultralytics.utils.downloads import unzip_file |
|
|
|
|
|
|
|
|
|
dir = unzip_file('path/to/file.zip') |
|
|
|
|
``` |
|
|
|
|
""" |
|
|
|
|
from zipfile import BadZipFile, ZipFile, is_zipfile |
|
|
|
|
|
|
|
|
|
if not (Path(file).exists() and is_zipfile(file)): |
|
|
|
|
raise BadZipFile(f"File '{file}' does not exist or is a bad zip file.") |
|
|
|
|
if path is None: |
|
|
|
@ -66,10 +114,10 @@ def unzip_file(file, path=None, exclude=('.DS_Store', '__MACOSX'), exist_ok=Fals |
|
|
|
|
|
|
|
|
|
# Unzip the file contents |
|
|
|
|
with ZipFile(file) as zipObj: |
|
|
|
|
file_list = [f for f in zipObj.namelist() if all(x not in f for x in exclude)] |
|
|
|
|
top_level_dirs = {Path(f).parts[0] for f in file_list} |
|
|
|
|
files = [f for f in zipObj.namelist() if all(x not in f for x in exclude)] |
|
|
|
|
top_level_dirs = {Path(f).parts[0] for f in files} |
|
|
|
|
|
|
|
|
|
if len(top_level_dirs) > 1 or not file_list[0].endswith('/'): |
|
|
|
|
if len(top_level_dirs) > 1 or not files[0].endswith('/'): |
|
|
|
|
path = Path(path) / Path(file).stem # define new unzip directory |
|
|
|
|
|
|
|
|
|
# Check if destination directory already exists and contains files |
|
|
|
@ -79,7 +127,7 @@ def unzip_file(file, path=None, exclude=('.DS_Store', '__MACOSX'), exist_ok=Fals |
|
|
|
|
LOGGER.info(f'Skipping {file} unzip (already unzipped)') |
|
|
|
|
return path |
|
|
|
|
|
|
|
|
|
for f in file_list: |
|
|
|
|
for f in tqdm(files, desc=f'Unzipping {file} to {Path(path).resolve()}...', unit='file', disable=not progress): |
|
|
|
|
zipObj.extract(f, path=path) |
|
|
|
|
|
|
|
|
|
return path # return unzip dir |
|
|
|
@ -117,6 +165,48 @@ def check_disk_space(url='https://ultralytics.com/assets/coco128.zip', sf=1.5, h |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_google_drive_file_info(link): |
|
|
|
|
""" |
|
|
|
|
Retrieves the direct download link and filename for a shareable Google Drive file link. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
link (str): The shareable link of the Google Drive file. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
(str): Direct download URL for the Google Drive file. |
|
|
|
|
(str): Original filename of the Google Drive file. If filename extraction fails, returns None. |
|
|
|
|
|
|
|
|
|
Example: |
|
|
|
|
```python |
|
|
|
|
from ultralytics.utils.downloads import get_google_drive_file_info |
|
|
|
|
|
|
|
|
|
link = "https://drive.google.com/file/d/1cqT-cJgANNrhIHCrEufUYhQ4RqiWG_lJ/view?usp=drive_link" |
|
|
|
|
url, filename = get_google_drive_file_info(link) |
|
|
|
|
``` |
|
|
|
|
""" |
|
|
|
|
file_id = link.split('/d/')[1].split('/view')[0] |
|
|
|
|
drive_url = f'https://drive.google.com/uc?export=download&id={file_id}' |
|
|
|
|
|
|
|
|
|
# Start session |
|
|
|
|
filename = None |
|
|
|
|
with requests.Session() as session: |
|
|
|
|
response = session.get(drive_url, stream=True) |
|
|
|
|
if 'quota exceeded' in str(response.content.lower()): |
|
|
|
|
raise ConnectionError( |
|
|
|
|
emojis(f'❌ Google Drive file download quota exceeded. ' |
|
|
|
|
f'Please try again later or download this file manually at {link}.')) |
|
|
|
|
token = None |
|
|
|
|
for key, value in response.cookies.items(): |
|
|
|
|
if key.startswith('download_warning'): |
|
|
|
|
token = value |
|
|
|
|
if token: |
|
|
|
|
drive_url = f'https://drive.google.com/uc?export=download&confirm={token}&id={file_id}' |
|
|
|
|
cd = response.headers.get('content-disposition') |
|
|
|
|
if cd: |
|
|
|
|
filename = re.findall('filename="(.+)"', cd)[0] |
|
|
|
|
return drive_url, filename |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_download(url, |
|
|
|
|
file=None, |
|
|
|
|
dir=None, |
|
|
|
@ -143,13 +233,18 @@ def safe_download(url, |
|
|
|
|
a successful download. Default: 1E0. |
|
|
|
|
progress (bool, optional): Whether to display a progress bar during the download. Default: True. |
|
|
|
|
""" |
|
|
|
|
f = dir / url2file(url) if dir else Path(file) # URL converted to filename |
|
|
|
|
|
|
|
|
|
# Check if the URL is a Google Drive link |
|
|
|
|
gdrive = 'drive.google.com' in url |
|
|
|
|
if gdrive: |
|
|
|
|
url, file = get_google_drive_file_info(url) |
|
|
|
|
|
|
|
|
|
f = dir / (file if gdrive else url2file(url)) if dir else Path(file) # URL converted to filename |
|
|
|
|
if '://' not in str(url) and Path(url).is_file(): # URL exists ('://' check required in Windows Python<3.10) |
|
|
|
|
f = Path(url) # filename |
|
|
|
|
elif not f.is_file(): # URL and file do not exist |
|
|
|
|
assert dir or file, 'dir or file required for download' |
|
|
|
|
f = dir / url2file(url) if dir else Path(file) |
|
|
|
|
desc = f"Downloading {clean_url(url)} to '{f}'" |
|
|
|
|
desc = f"Downloading {url if gdrive else clean_url(url)} to '{f}'" |
|
|
|
|
LOGGER.info(f'{desc}...') |
|
|
|
|
f.parent.mkdir(parents=True, exist_ok=True) # make directory if missing |
|
|
|
|
check_disk_space(url) |
|
|
|
@ -189,14 +284,14 @@ def safe_download(url, |
|
|
|
|
LOGGER.warning(f'⚠️ Download failure, retrying {i + 1}/{retry} {url}...') |
|
|
|
|
|
|
|
|
|
if unzip and f.exists() and f.suffix in ('', '.zip', '.tar', '.gz'): |
|
|
|
|
from zipfile import is_zipfile |
|
|
|
|
|
|
|
|
|
unzip_dir = dir or f.parent # unzip to dir if provided else unzip in place |
|
|
|
|
LOGGER.info(f'Unzipping {f} to {unzip_dir.absolute()}...') |
|
|
|
|
if is_zipfile(f): |
|
|
|
|
unzip_dir = unzip_file(file=f, path=unzip_dir) # unzip |
|
|
|
|
elif f.suffix == '.tar': |
|
|
|
|
subprocess.run(['tar', 'xf', f, '--directory', unzip_dir], check=True) # unzip |
|
|
|
|
elif f.suffix == '.gz': |
|
|
|
|
subprocess.run(['tar', 'xfz', f, '--directory', unzip_dir], check=True) # unzip |
|
|
|
|
elif f.suffix in ('.tar', '.gz'): |
|
|
|
|
LOGGER.info(f'Unzipping {f} to {unzip_dir.resolve()}...') |
|
|
|
|
subprocess.run(['tar', 'xf' if f.suffix == '.tar' else 'xfz', f, '--directory', unzip_dir], check=True) |
|
|
|
|
if delete: |
|
|
|
|
f.unlink() # remove zip |
|
|
|
|
return unzip_dir |
|
|
|
|