Module licenseware.utils.file_utils
Expand source code
import os, re
import shutil
from licenseware.common.constants import envs
from licenseware.utils.logger import log
from werkzeug.utils import secure_filename as werkzeug_secure_filename
accepted_archives = ('.zip', '.tar', '.tar.bz2', '.tar.gz', '.tgz',)
def is_archive(filepath: str):
return filepath.endswith(accepted_archives)
def list_files_from_path(dir_path: str):
filepaths = []
for root, dirs, filenames in os.walk(dir_path):
for filename in filenames:
filepaths.append(os.path.join(root, filename))
return filepaths
def unzip(file_path: str):
"""
Extracts: “zip”, “tar”, “gztar”, “bztar”, or “xztar”.
Returns the path where the file was extracted
"""
if not is_archive(file_path):
raise ValueError(f"Invalid archive type, currently accepting: {accepted_archives}")
file_name = os.path.basename(file_path)
file_dir = os.path.dirname(file_path)
extract_path = os.path.join(file_dir, file_name + "_extracted")
shutil.unpack_archive(file_path, extract_path)
return extract_path
def recursive_unzip(file_path: str):
"""
Iterate over an archive and recursively extract all archives using unzip function
"""
if is_archive(file_path):
unziped_base = unzip(file_path)
if unziped_base is None: return
else:
unziped_base = file_path
archives_found = False
for root, dirs, filenames in os.walk(unziped_base):
for filename in filenames:
fpath = os.path.join(root, filename)
if not is_archive(fpath): continue
if not os.path.exists(fpath): continue
unzipped_path = unzip(fpath)
if unzipped_path is None: continue
os.remove(fpath)
archives_found = True
file_path = file_path + '_extracted' if not file_path.endswith('_extracted') else file_path
if archives_found:
recursive_unzip(file_path)
return file_path
def secure_filename(fname):
"""
Custom handling of secure filename
Added an exception for review lite files `~` char
"""
review_lite_file = re.search(r"feature|option|version", fname, re.IGNORECASE)
if review_lite_file: fname = re.sub('~', '_SECURE_TILDA_', fname)
fname = werkzeug_secure_filename(fname)
if review_lite_file: fname = re.sub('_SECURE_TILDA_', '~', fname)
return fname
def save_file(file, tenant_id=None, path=None):
"""
Save to disk flask file stream
"""
if envs.DESKTOP_ENVIRONMENT and tenant_id is None:
tenant_id = envs.DESKTOP_TENANT_ID
filename = secure_filename(file.filename)
# dir_id = generate_id() # doing this to avoid overwriting already uploaded files
# dir_id works only for uploaders with one file sent for processing
save_path = path or os.path.join(envs.FILE_UPLOAD_PATH, tenant_id)
if not os.path.exists(save_path): os.makedirs(save_path)
try:
file.seek(0) # move cursor to 0 (stream left it on last read)
except ValueError:
log.error("File is not a stream, can't seek")
file_path = os.path.join(save_path, filename)
file.save(file_path)
file.close()
return file_path
Functions
def is_archive(filepath: str)
-
Expand source code
def is_archive(filepath: str): return filepath.endswith(accepted_archives)
def list_files_from_path(dir_path: str)
-
Expand source code
def list_files_from_path(dir_path: str): filepaths = [] for root, dirs, filenames in os.walk(dir_path): for filename in filenames: filepaths.append(os.path.join(root, filename)) return filepaths
def recursive_unzip(file_path: str)
-
Iterate over an archive and recursively extract all archives using unzip function
Expand source code
def recursive_unzip(file_path: str): """ Iterate over an archive and recursively extract all archives using unzip function """ if is_archive(file_path): unziped_base = unzip(file_path) if unziped_base is None: return else: unziped_base = file_path archives_found = False for root, dirs, filenames in os.walk(unziped_base): for filename in filenames: fpath = os.path.join(root, filename) if not is_archive(fpath): continue if not os.path.exists(fpath): continue unzipped_path = unzip(fpath) if unzipped_path is None: continue os.remove(fpath) archives_found = True file_path = file_path + '_extracted' if not file_path.endswith('_extracted') else file_path if archives_found: recursive_unzip(file_path) return file_path
def save_file(file, tenant_id=None, path=None)
-
Save to disk flask file stream
Expand source code
def save_file(file, tenant_id=None, path=None): """ Save to disk flask file stream """ if envs.DESKTOP_ENVIRONMENT and tenant_id is None: tenant_id = envs.DESKTOP_TENANT_ID filename = secure_filename(file.filename) # dir_id = generate_id() # doing this to avoid overwriting already uploaded files # dir_id works only for uploaders with one file sent for processing save_path = path or os.path.join(envs.FILE_UPLOAD_PATH, tenant_id) if not os.path.exists(save_path): os.makedirs(save_path) try: file.seek(0) # move cursor to 0 (stream left it on last read) except ValueError: log.error("File is not a stream, can't seek") file_path = os.path.join(save_path, filename) file.save(file_path) file.close() return file_path
def secure_filename(fname)
-
Custom handling of secure filename Added an exception for review lite files
~
charExpand source code
def secure_filename(fname): """ Custom handling of secure filename Added an exception for review lite files `~` char """ review_lite_file = re.search(r"feature|option|version", fname, re.IGNORECASE) if review_lite_file: fname = re.sub('~', '_SECURE_TILDA_', fname) fname = werkzeug_secure_filename(fname) if review_lite_file: fname = re.sub('_SECURE_TILDA_', '~', fname) return fname
def unzip(file_path: str)
-
Extracts: “zip”, “tar”, “gztar”, “bztar”, or “xztar”.
Returns the path where the file was extractedExpand source code
def unzip(file_path: str): """ Extracts: “zip”, “tar”, “gztar”, “bztar”, or “xztar”. Returns the path where the file was extracted """ if not is_archive(file_path): raise ValueError(f"Invalid archive type, currently accepting: {accepted_archives}") file_name = os.path.basename(file_path) file_dir = os.path.dirname(file_path) extract_path = os.path.join(file_dir, file_name + "_extracted") shutil.unpack_archive(file_path, extract_path) return extract_path