Module licenseware.uploader_validator.uploader_validator
Expand source code
import os
from typing import Tuple
from flask import Request
from licenseware.quota import Quota
from licenseware.utils.logger import log
from licenseware.common.constants import states
from .filename_validator import FileNameValidator
from .file_content_validator import FileContentValidator
class UploaderValidator(FileNameValidator, FileContentValidator):
"""
"""
def __init__(
self,
filename_contains:list = [],
filename_endswith:list = [],
ignore_filenames:list = [],
required_input_type:str = None,
required_sheets:list = [],
required_columns:list = [],
text_contains_all:list = [],
text_contains_any:list = [],
regex_escape:bool = True,
min_rows_number:int = 0,
header_starts_at:int = 0,
buffer:int = 9000,
filename_valid_message = "File is valid",
filename_invalid_message = None,
filename_ignored_message = "File is ignored",
ignored_by_uup:bool = False, # ignored from universal uploader matcher
_uploader_id:str = None,
_quota_units:int = None,
**options
):
self.quota_units = _quota_units
self.uploader_id = _uploader_id
self.filename_contains = filename_contains
self.filename_endswith = filename_endswith
self.ignore_filenames = ignore_filenames
self.required_input_type = required_input_type
self.required_sheets = required_sheets
self.required_columns = required_columns
self.text_contains_all = text_contains_all
self.text_contains_any = text_contains_any
self.regex_escape = regex_escape
self.min_rows_number = min_rows_number
self.header_starts_at = header_starts_at
self.buffer = buffer
self.filename_valid_message = filename_valid_message
self.filename_invalid_message = filename_invalid_message
self.filename_ignored_message = filename_ignored_message
self.ignored_by_uup = ignored_by_uup
self.options = options
self.validation_parameters = self.get_validation_parameters()
super().__init__(**vars(self))
def quota_within_limits(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]:
q = Quota(
tenant_id=tenant_id,
auth_token=auth_token,
uploader_id=self.uploader_id,
units=self.quota_units
)
response, status_code = q.check_quota(units)
return response, status_code
def update_quota(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]:
q = Quota(
tenant_id=tenant_id,
auth_token=auth_token,
uploader_id=self.uploader_id,
units=self.quota_units
)
response, status_code = q.update_quota(units)
return response, status_code
def calculate_quota(self, flask_request: Request, update_quota_units: bool = True) -> Tuple[dict, int]:
if self.quota_units is None:
return {'status': states.SUCCESS, 'message': 'Quota is skipped'}, 200
log.warning("Calculating quota based on length of files")
tenant_id = flask_request.headers.get('Tenantid')
auth_token = flask_request.headers.get('Authorization')
file_objects = flask_request.files.getlist("files[]")
current_units_to_process = len(file_objects)
quota_check_response, quota_check_status = \
self.quota_within_limits(tenant_id, auth_token, current_units_to_process)
log.info(quota_check_response)
if quota_check_status == 200 and update_quota_units == True:
response, status_code = self.update_quota(tenant_id, auth_token, current_units_to_process)
log.info(response)
return quota_check_response, quota_check_status
@classmethod
def get_filepaths_from_objects_response(cls, file_objects_response):
file_paths = [
res['filepath']
for res in file_objects_response['validation']
]
return file_paths
@classmethod
def get_only_valid_filepaths_from_objects_response(cls, file_objects_response):
file_paths = [
res['filepath']
for res in file_objects_response['validation']
if res['filepath'] != 'File not saved' and os.path.exists(res['filepath'])
]
return file_paths
def get_validation_parameters(self):
return dict(
filename_contains = self.filename_contains,
filename_endswith = self.filename_endswith,
ignore_filenames = self.ignore_filenames,
required_input_type = self.required_input_type,
required_sheets = self.required_sheets,
required_columns = self.required_columns,
text_contains_all = self.text_contains_all,
text_contains_any = self.text_contains_any,
min_rows_number = self.min_rows_number,
header_starts_at = self.header_starts_at,
buffer = self.buffer,
filename_valid_message = self.filename_valid_message,
filename_invalid_message = self.filename_invalid_message,
filename_ignored_message = self.filename_ignored_message,
regex_escape = self.regex_escape,
ignored_by_uup = self.ignored_by_uup
)
def get_full_validation_response(self, filepath:str):
filename = os.path.basename(filepath)
filename_validation_response = self.validate_filenames([filename])
file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS
file_response = filename_validation_response[0]['message']
content_validation_response = self.validate_filepaths_content([filepath])
log.info(content_validation_response)
file_content_ok = content_validation_response[0]['status'] == states.SUCCESS
content_response = content_validation_response[0]['message']
log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}")
log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}")
fileok = all([file_name_ok, file_content_ok])
message = f"{'Name: ' + file_response if not file_name_ok else ''}{' Content: ' + content_response if not file_content_ok else ''}".strip()
return {
"status": states.SUCCESS if fileok else states.SKIPPED,
"name": file_name_ok,
"content": file_content_ok,
"message": message or "File is valid"
}
def valid_filepath(self, filepath:str):
filename = os.path.basename(filepath)
filename_validation_response = self.validate_filenames([filename])
file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS
file_content_ok = False
if file_name_ok:
content_validation_response = self.validate_filepaths_content([filepath])
log.info(content_validation_response)
file_content_ok = content_validation_response[0]['status'] == states.SUCCESS
log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}")
log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}")
return all([file_name_ok, file_content_ok])
Classes
class UploaderValidator (filename_contains: list = [], filename_endswith: list = [], ignore_filenames: list = [], required_input_type: str = None, required_sheets: list = [], required_columns: list = [], text_contains_all: list = [], text_contains_any: list = [], regex_escape: bool = True, min_rows_number: int = 0, header_starts_at: int = 0, buffer: int = 9000, filename_valid_message='File is valid', filename_invalid_message=None, filename_ignored_message='File is ignored', ignored_by_uup: bool = False, **options)
-
Expand source code
class UploaderValidator(FileNameValidator, FileContentValidator): """ """ def __init__( self, filename_contains:list = [], filename_endswith:list = [], ignore_filenames:list = [], required_input_type:str = None, required_sheets:list = [], required_columns:list = [], text_contains_all:list = [], text_contains_any:list = [], regex_escape:bool = True, min_rows_number:int = 0, header_starts_at:int = 0, buffer:int = 9000, filename_valid_message = "File is valid", filename_invalid_message = None, filename_ignored_message = "File is ignored", ignored_by_uup:bool = False, # ignored from universal uploader matcher _uploader_id:str = None, _quota_units:int = None, **options ): self.quota_units = _quota_units self.uploader_id = _uploader_id self.filename_contains = filename_contains self.filename_endswith = filename_endswith self.ignore_filenames = ignore_filenames self.required_input_type = required_input_type self.required_sheets = required_sheets self.required_columns = required_columns self.text_contains_all = text_contains_all self.text_contains_any = text_contains_any self.regex_escape = regex_escape self.min_rows_number = min_rows_number self.header_starts_at = header_starts_at self.buffer = buffer self.filename_valid_message = filename_valid_message self.filename_invalid_message = filename_invalid_message self.filename_ignored_message = filename_ignored_message self.ignored_by_uup = ignored_by_uup self.options = options self.validation_parameters = self.get_validation_parameters() super().__init__(**vars(self)) def quota_within_limits(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]: q = Quota( tenant_id=tenant_id, auth_token=auth_token, uploader_id=self.uploader_id, units=self.quota_units ) response, status_code = q.check_quota(units) return response, status_code def update_quota(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]: q = Quota( tenant_id=tenant_id, auth_token=auth_token, uploader_id=self.uploader_id, units=self.quota_units ) response, status_code = q.update_quota(units) return response, status_code def calculate_quota(self, flask_request: Request, update_quota_units: bool = True) -> Tuple[dict, int]: if self.quota_units is None: return {'status': states.SUCCESS, 'message': 'Quota is skipped'}, 200 log.warning("Calculating quota based on length of files") tenant_id = flask_request.headers.get('Tenantid') auth_token = flask_request.headers.get('Authorization') file_objects = flask_request.files.getlist("files[]") current_units_to_process = len(file_objects) quota_check_response, quota_check_status = \ self.quota_within_limits(tenant_id, auth_token, current_units_to_process) log.info(quota_check_response) if quota_check_status == 200 and update_quota_units == True: response, status_code = self.update_quota(tenant_id, auth_token, current_units_to_process) log.info(response) return quota_check_response, quota_check_status @classmethod def get_filepaths_from_objects_response(cls, file_objects_response): file_paths = [ res['filepath'] for res in file_objects_response['validation'] ] return file_paths @classmethod def get_only_valid_filepaths_from_objects_response(cls, file_objects_response): file_paths = [ res['filepath'] for res in file_objects_response['validation'] if res['filepath'] != 'File not saved' and os.path.exists(res['filepath']) ] return file_paths def get_validation_parameters(self): return dict( filename_contains = self.filename_contains, filename_endswith = self.filename_endswith, ignore_filenames = self.ignore_filenames, required_input_type = self.required_input_type, required_sheets = self.required_sheets, required_columns = self.required_columns, text_contains_all = self.text_contains_all, text_contains_any = self.text_contains_any, min_rows_number = self.min_rows_number, header_starts_at = self.header_starts_at, buffer = self.buffer, filename_valid_message = self.filename_valid_message, filename_invalid_message = self.filename_invalid_message, filename_ignored_message = self.filename_ignored_message, regex_escape = self.regex_escape, ignored_by_uup = self.ignored_by_uup ) def get_full_validation_response(self, filepath:str): filename = os.path.basename(filepath) filename_validation_response = self.validate_filenames([filename]) file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS file_response = filename_validation_response[0]['message'] content_validation_response = self.validate_filepaths_content([filepath]) log.info(content_validation_response) file_content_ok = content_validation_response[0]['status'] == states.SUCCESS content_response = content_validation_response[0]['message'] log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}") log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}") fileok = all([file_name_ok, file_content_ok]) message = f"{'Name: ' + file_response if not file_name_ok else ''}{' Content: ' + content_response if not file_content_ok else ''}".strip() return { "status": states.SUCCESS if fileok else states.SKIPPED, "name": file_name_ok, "content": file_content_ok, "message": message or "File is valid" } def valid_filepath(self, filepath:str): filename = os.path.basename(filepath) filename_validation_response = self.validate_filenames([filename]) file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS file_content_ok = False if file_name_ok: content_validation_response = self.validate_filepaths_content([filepath]) log.info(content_validation_response) file_content_ok = content_validation_response[0]['status'] == states.SUCCESS log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}") log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}") return all([file_name_ok, file_content_ok])
Ancestors
Static methods
def get_filepaths_from_objects_response(file_objects_response)
-
Expand source code
@classmethod def get_filepaths_from_objects_response(cls, file_objects_response): file_paths = [ res['filepath'] for res in file_objects_response['validation'] ] return file_paths
def get_only_valid_filepaths_from_objects_response(file_objects_response)
-
Expand source code
@classmethod def get_only_valid_filepaths_from_objects_response(cls, file_objects_response): file_paths = [ res['filepath'] for res in file_objects_response['validation'] if res['filepath'] != 'File not saved' and os.path.exists(res['filepath']) ] return file_paths
Methods
def calculate_quota(self, flask_request: flask.wrappers.Request, update_quota_units: bool = True) ‑> Tuple[dict, int]
-
Expand source code
def calculate_quota(self, flask_request: Request, update_quota_units: bool = True) -> Tuple[dict, int]: if self.quota_units is None: return {'status': states.SUCCESS, 'message': 'Quota is skipped'}, 200 log.warning("Calculating quota based on length of files") tenant_id = flask_request.headers.get('Tenantid') auth_token = flask_request.headers.get('Authorization') file_objects = flask_request.files.getlist("files[]") current_units_to_process = len(file_objects) quota_check_response, quota_check_status = \ self.quota_within_limits(tenant_id, auth_token, current_units_to_process) log.info(quota_check_response) if quota_check_status == 200 and update_quota_units == True: response, status_code = self.update_quota(tenant_id, auth_token, current_units_to_process) log.info(response) return quota_check_response, quota_check_status
def get_full_validation_response(self, filepath: str)
-
Expand source code
def get_full_validation_response(self, filepath:str): filename = os.path.basename(filepath) filename_validation_response = self.validate_filenames([filename]) file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS file_response = filename_validation_response[0]['message'] content_validation_response = self.validate_filepaths_content([filepath]) log.info(content_validation_response) file_content_ok = content_validation_response[0]['status'] == states.SUCCESS content_response = content_validation_response[0]['message'] log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}") log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}") fileok = all([file_name_ok, file_content_ok]) message = f"{'Name: ' + file_response if not file_name_ok else ''}{' Content: ' + content_response if not file_content_ok else ''}".strip() return { "status": states.SUCCESS if fileok else states.SKIPPED, "name": file_name_ok, "content": file_content_ok, "message": message or "File is valid" }
def get_validation_parameters(self)
-
Expand source code
def get_validation_parameters(self): return dict( filename_contains = self.filename_contains, filename_endswith = self.filename_endswith, ignore_filenames = self.ignore_filenames, required_input_type = self.required_input_type, required_sheets = self.required_sheets, required_columns = self.required_columns, text_contains_all = self.text_contains_all, text_contains_any = self.text_contains_any, min_rows_number = self.min_rows_number, header_starts_at = self.header_starts_at, buffer = self.buffer, filename_valid_message = self.filename_valid_message, filename_invalid_message = self.filename_invalid_message, filename_ignored_message = self.filename_ignored_message, regex_escape = self.regex_escape, ignored_by_uup = self.ignored_by_uup )
def quota_within_limits(self, tenant_id: str, auth_token: str, units: int) ‑> Tuple[dict, int]
-
Expand source code
def quota_within_limits(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]: q = Quota( tenant_id=tenant_id, auth_token=auth_token, uploader_id=self.uploader_id, units=self.quota_units ) response, status_code = q.check_quota(units) return response, status_code
def update_quota(self, tenant_id: str, auth_token: str, units: int) ‑> Tuple[dict, int]
-
Expand source code
def update_quota(self, tenant_id:str, auth_token:str, units: int) -> Tuple[dict, int]: q = Quota( tenant_id=tenant_id, auth_token=auth_token, uploader_id=self.uploader_id, units=self.quota_units ) response, status_code = q.update_quota(units) return response, status_code
def valid_filepath(self, filepath: str)
-
Expand source code
def valid_filepath(self, filepath:str): filename = os.path.basename(filepath) filename_validation_response = self.validate_filenames([filename]) file_name_ok = filename_validation_response[0]['status'] == states.SUCCESS file_content_ok = False if file_name_ok: content_validation_response = self.validate_filepaths_content([filepath]) log.info(content_validation_response) file_content_ok = content_validation_response[0]['status'] == states.SUCCESS log.info(f"{filename} - file_name_ok:{file_name_ok}, file_content_ok:{file_content_ok}") log.info(f"{self.uploader_id} - {all([file_name_ok, file_content_ok])}") return all([file_name_ok, file_content_ok])
Inherited members