|
import os |
|
import os.path |
|
import hashlib |
|
import errno |
|
from tqdm import tqdm |
|
|
|
|
|
def gen_bar_updater(pbar): |
|
def bar_update(count, block_size, total_size): |
|
if pbar.total is None and total_size: |
|
pbar.total = total_size |
|
progress_bytes = count * block_size |
|
pbar.update(progress_bytes - pbar.n) |
|
|
|
return bar_update |
|
|
|
|
|
def check_integrity(fpath, md5=None): |
|
if md5 is None: |
|
return True |
|
if not os.path.isfile(fpath): |
|
return False |
|
md5o = hashlib.md5() |
|
with open(fpath, 'rb') as f: |
|
|
|
for chunk in iter(lambda: f.read(1024 * 1024), b''): |
|
md5o.update(chunk) |
|
md5c = md5o.hexdigest() |
|
if md5c != md5: |
|
return False |
|
return True |
|
|
|
|
|
def makedir_exist_ok(dirpath): |
|
""" |
|
Python2 support for os.makedirs(.., exist_ok=True) |
|
""" |
|
try: |
|
os.makedirs(dirpath) |
|
except OSError as e: |
|
if e.errno == errno.EEXIST: |
|
pass |
|
else: |
|
raise |
|
|
|
|
|
def download_url(url, root, filename=None, md5=None): |
|
"""Download a file from a url and place it in root. |
|
Args: |
|
url (str): URL to download file from |
|
root (str): Directory to place downloaded file in |
|
filename (str): Name to save the file under. If None, use the basename of the URL |
|
md5 (str): MD5 checksum of the download. If None, do not check |
|
""" |
|
from six.moves import urllib |
|
|
|
root = os.path.expanduser(root) |
|
if not filename: |
|
filename = os.path.basename(url) |
|
fpath = os.path.join(root, filename) |
|
|
|
makedir_exist_ok(root) |
|
|
|
|
|
if os.path.isfile(fpath) and check_integrity(fpath, md5): |
|
print('Using downloaded and verified file: ' + fpath) |
|
else: |
|
try: |
|
print('Downloading ' + url + ' to ' + fpath) |
|
urllib.request.urlretrieve( |
|
url, fpath, |
|
reporthook=gen_bar_updater(tqdm(unit='B', unit_scale=True)) |
|
) |
|
except OSError: |
|
if url[:5] == 'https': |
|
url = url.replace('https:', 'http:') |
|
print('Failed download. Trying https -> http instead.' |
|
' Downloading ' + url + ' to ' + fpath) |
|
urllib.request.urlretrieve( |
|
url, fpath, |
|
reporthook=gen_bar_updater(tqdm(unit='B', unit_scale=True)) |
|
) |
|
|
|
|
|
def list_dir(root, prefix=False): |
|
"""List all directories at a given root |
|
Args: |
|
root (str): Path to directory whose folders need to be listed |
|
prefix (bool, optional): If true, prepends the path to each result, otherwise |
|
only returns the name of the directories found |
|
""" |
|
root = os.path.expanduser(root) |
|
directories = list( |
|
filter( |
|
lambda p: os.path.isdir(os.path.join(root, p)), |
|
os.listdir(root) |
|
) |
|
) |
|
|
|
if prefix is True: |
|
directories = [os.path.join(root, d) for d in directories] |
|
|
|
return directories |
|
|
|
|
|
def list_files(root, suffix, prefix=False): |
|
"""List all files ending with a suffix at a given root |
|
Args: |
|
root (str): Path to directory whose folders need to be listed |
|
suffix (str or tuple): Suffix of the files to match, e.g. '.png' or ('.jpg', '.png'). |
|
It uses the Python "str.endswith" method and is passed directly |
|
prefix (bool, optional): If true, prepends the path to each result, otherwise |
|
only returns the name of the files found |
|
""" |
|
root = os.path.expanduser(root) |
|
files = list( |
|
filter( |
|
lambda p: os.path.isfile(os.path.join(root, p)) and p.endswith(suffix), |
|
os.listdir(root) |
|
) |
|
) |
|
|
|
if prefix is True: |
|
files = [os.path.join(root, d) for d in files] |
|
|
|
return files |