Module licenseware.history.step

Expand source code
import os
import shutil
import datetime
from licenseware import mongodata
from licenseware.common.constants import envs, states
from .history_schemas import HistorySchema
from licenseware.utils.logger import log as logg


def save_filename_validation(metadata, response):

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "filename_validation": response["validation"],
        "filename_validation_updated_at": datetime.datetime.utcnow().isoformat(),
        "updated_at": datetime.datetime.utcnow().isoformat()
    }

    return mongodata.insert(
        schema=HistorySchema,
        data=data,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )


def copy_files_uploaded_on_event_folder(data):
    """
        Files uploaded are saved in another folder for the purpose of replicating the errors
        Files will be deleted after 1 month (iso date specifies when files will be deleted)
    """
    if envs.DESKTOP_ENVIRONMENT:
        return []
        
    expiration_iso_date = (datetime.datetime.utcnow() + datetime.timedelta(days=30)).date().isoformat()
    folder_name = f"{data['tenant_id']}_{data['event_id']}_{expiration_iso_date}"
    folder_path = os.path.join(envs.FILE_UPLOAD_PATH, folder_name)

    if not os.path.exists(folder_path): os.makedirs(folder_path)

    files_uploaded_on_event = []
    for fp in data["files_uploaded"]:
        save_path = os.path.join(folder_path, os.path.basename(fp))
        shutil.copy2(src=fp, dst=save_path)
        files_uploaded_on_event.append(save_path)

    return files_uploaded_on_event


def save_file_content_validation(metadata, response):

    if "event_data" not in response:
        logg.info("Parameter `event_data` not found on response from `upload_files`")
        return

    file_content_validation = []
    for cv in response["event_data"]:
        file_content_validation.extend(cv["validation_response"]['validation'])

    filepaths = []
    for cv in response["event_data"]:
        filepaths.extend(cv["filepaths"])

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "file_content_validation": file_content_validation,
        "files_uploaded": filepaths,
        "updated_at": datetime.datetime.utcnow().isoformat()
    }

    data["files_uploaded"] = copy_files_uploaded_on_event_folder(data)
    data["file_content_validation_updated_at"] = datetime.datetime.utcnow().isoformat()

    return mongodata.update(
        schema=HistorySchema,
        match={
            "tenant_id": metadata["tenant_id"],
            "event_id": metadata["event_id"]
        },
        new_data=data,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )


def save_processing_details(metadata, response):

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "updated_at": datetime.datetime.utcnow().isoformat(),
        "processing_details": [{
            "step": metadata['step'],
            "filepath": metadata["filepath"],
            "status": response["status"],
            "success": response["success"],
            "error": response["error"],
            "traceback": response["traceback"],
            "callable": metadata['callable'],
            "source": metadata['source'],
            "file_name": metadata['file_name'],
            "updated_at": datetime.datetime.utcnow().isoformat()
        }]
    }

    return mongodata.update(
        schema=HistorySchema,
        match={
            "tenant_id": metadata["tenant_id"],
            "event_id": metadata["event_id"]
        },
        new_data=data,
        append=True,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )


def save_step(
        metadata,
        response,
        on_success_save: any = None,
        on_failure_save: any = None,
        raised_error: bool = False
):
    # We can't track files without an event_id
    if metadata['event_id'] is None: return

    if metadata['callable'] == 'validate_filenames':
        return save_filename_validation(metadata, response[0])

    if metadata['callable'] == 'upload_files':
        return save_file_content_validation(metadata, response[0])

    # Success cases
    if not raised_error and on_success_save:
        return save_processing_details(metadata, {
            "status": states.SUCCESS,
            "success": on_success_save,
            "error": None,
            "traceback": None
        })
    if not raised_error and not on_success_save:
        return save_processing_details(metadata, {
            "status": states.SUCCESS,
            "success": None,
            "error": None,
            "traceback": None
        })

    # Failed cases
    if raised_error and on_failure_save:
        return save_processing_details(metadata, {
            "status": states.FAILED,
            "success": None,
            "error": on_failure_save,
            "traceback": response['traceback']
        })

    if raised_error and not on_failure_save:
        return save_processing_details(metadata, {
            "status": states.FAILED,
            "success": None,
            "error": response['error'],
            "traceback": response['traceback']
        })









# File validation response
# {
#   "status": "success",
#   "message": "Filenames are valid",
#   "validation": [
#     {
#       "status": "success",
#       "filename": "rvtools.xlsx",
#       "message": "Filename is valid"
#     },
#     {
#       "status": "success",
#       "filename": "options.csv",
#       "message": "Filename is valid"
#     }
#   ]
# }

# Upload files response
# {
#   "status": "success",
#   "message": "Event sent",
#   "event_data": {
#     "tenant_id": "b37761e3-6926-4cc1-88c7-4d0478b04adf",
#     "uploader_id": "universal_uploader",
#     "filepaths": [
#       "/tmp/lware/b37761e3-6926-4cc1-88c7-4d0478b04adf/cpuq.txt"
#     ],
#     "flask_request": {
#       "Host": "localhost",
#       "Connection": "keep-alive",
#       "X-Forwarded-For": "172.18.0.1",
#       "X-Forwarded-Proto": "http",
#       "X-Forwarded-Host": "localhost",
#       "X-Forwarded-Port": "80",
#       "X-Forwarded-Path": "/universal-uploader/uploads/universal_uploader/files",
#       "X-Real-Ip": "172.18.0.1",
#       "Content-Length": "6655",
#       "Sec-Ch-Ua": "\" Not A;Brand\";v=\"99\", \"Chromium\";v=\"99\", \"Google Chrome\";v=\"99\"",
#       "Sec-Ch-Ua-Mobile": "?0",
#       "Authorization": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2NDczMjg4MzAsImlhdCI6MTY0NzI0MjQyNSwic3ViIjoiMzc0MmQ4ODgtNzNjNS00MTA1LTk4OTgtYjIwZjZhMmNlMjM1In0.3IrFRs4-VQiYzUCTc_QgZKQo8NVnbdSmRWmU4s7eRfE",
#       "Tenantid": "b37761e3-6926-4cc1-88c7-4d0478b04adf",
#       "Content-Type": "multipart/form-data; boundary=----WebKitFormBoundaryFy5k1hbwcuEa1dsf",
#       "Accept": "application/json",
#       "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36",
#       "Sec-Ch-Ua-Platform": "\"Linux\"",
#       "Origin": "http://localhost",
#       "Sec-Fetch-Site": "same-origin",
#       "Sec-Fetch-Mode": "cors",
#       "Sec-Fetch-Dest": "empty",
#       "Referer": "http://localhost/universal-uploader/docs",
#       "Accept-Encoding": "gzip, deflate, br",
#       "Accept-Language": "en-US,en;q=0.9,ro;q=0.8,it;q=0.7,fr;q=0.6",
#       "Cookie": "PGADMIN_LANGUAGE=en; username-localhost-8889=\"2|1:0|10:1646133817|23:username-localhost-8889|44:ZTZkMjJhN2NkZWM2NDNmZDkxYTVkNTVjYTg0NjRlM2E=|38b7f9e169bfa62662aa3ff1d146050f2964b793dc3902120ba90c4e20535626\"; username-localhost-8888=\"2|1:0|10:1646996386|23:username-localhost-8888|44:MmQzODg2ODhjY2E0NDY1Yzg1Y2FhOWUwY2EzYjRlZDY=|e72eb76bbecc87382940894602d81bb601977e231173ac0d2922d8fc4b1218b4\"; _xsrf=2|3ea9ea76|6ebac8ec76a4cd27fed85d0eb847f7bd|1646996386; mongo-express=s%3AVAettPkb_tNq0EZQcZqY96UKMb4vQ_s1.m02RdaZSFY5qVSrgPU%2FkfwHsrhojQX0P8gGOnjMZ1Ys"
#     },
#     "validation_response": {
#       "tenant_id": "b37761e3-6926-4cc1-88c7-4d0478b04adf",
#       "status": "success",
#       "message": "Files are valid",
#       "validation": [
#         {
#           "status": "success",
#           "filename": "cpuq.txt",
#           "filepath": "/tmp/lware/b37761e3-6926-4cc1-88c7-4d0478b04adf/cpuq.txt",
#           "message": "Filename is valid"
#         }
#       ]
#     }
#   }
# }




# {
#     'callable': 'validate_filenames_no_flask_request',
#     'docs': 'Validate filenames received', 'source':
#     '/home/acmt/Documents/lware/licenseware-sdk-v2/tests/test_history.py',
#     'tenant_id': '123',
#     'event_id': 'wer',
#     'app_id': 'app',
#     'uploader_id': 'rv_tools'
# }



# {
#     "tenant_id": "xxx",
#     "event_id": "xxxx",
#     "app_id": "",
#     "uploader_id": "rv_lite",
#     "entities_ids": [
#         "uuid4 strings"
#     ],
#     "filename_validation": [
#         {"file_path": "/path/file", "response": the filename validation response},
#         etc
#     ],
#     "file_content_validatio nr bulk file should look:": [
#         {"file_path": "/path/file", "response": the file content validation response},
#         etc
#     ],
#     "files_uploaded": [file_path1, file_path2, file_path3],
#     "processing_details": [
#             {
#                 "step": "Getting machines CPU cores",
#                 "error": "Value in column x not an integer",
#                 "processed_file_path": "path/to/file/processed",
#                 "traceback": "traceback error"
#             }
#             etc
#     ]
# }

Functions

def copy_files_uploaded_on_event_folder(data)

Files uploaded are saved in another folder for the purpose of replicating the errors Files will be deleted after 1 month (iso date specifies when files will be deleted)

Expand source code
def copy_files_uploaded_on_event_folder(data):
    """
        Files uploaded are saved in another folder for the purpose of replicating the errors
        Files will be deleted after 1 month (iso date specifies when files will be deleted)
    """
    if envs.DESKTOP_ENVIRONMENT:
        return []
        
    expiration_iso_date = (datetime.datetime.utcnow() + datetime.timedelta(days=30)).date().isoformat()
    folder_name = f"{data['tenant_id']}_{data['event_id']}_{expiration_iso_date}"
    folder_path = os.path.join(envs.FILE_UPLOAD_PATH, folder_name)

    if not os.path.exists(folder_path): os.makedirs(folder_path)

    files_uploaded_on_event = []
    for fp in data["files_uploaded"]:
        save_path = os.path.join(folder_path, os.path.basename(fp))
        shutil.copy2(src=fp, dst=save_path)
        files_uploaded_on_event.append(save_path)

    return files_uploaded_on_event
def save_file_content_validation(metadata, response)
Expand source code
def save_file_content_validation(metadata, response):

    if "event_data" not in response:
        logg.info("Parameter `event_data` not found on response from `upload_files`")
        return

    file_content_validation = []
    for cv in response["event_data"]:
        file_content_validation.extend(cv["validation_response"]['validation'])

    filepaths = []
    for cv in response["event_data"]:
        filepaths.extend(cv["filepaths"])

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "file_content_validation": file_content_validation,
        "files_uploaded": filepaths,
        "updated_at": datetime.datetime.utcnow().isoformat()
    }

    data["files_uploaded"] = copy_files_uploaded_on_event_folder(data)
    data["file_content_validation_updated_at"] = datetime.datetime.utcnow().isoformat()

    return mongodata.update(
        schema=HistorySchema,
        match={
            "tenant_id": metadata["tenant_id"],
            "event_id": metadata["event_id"]
        },
        new_data=data,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )
def save_filename_validation(metadata, response)
Expand source code
def save_filename_validation(metadata, response):

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "filename_validation": response["validation"],
        "filename_validation_updated_at": datetime.datetime.utcnow().isoformat(),
        "updated_at": datetime.datetime.utcnow().isoformat()
    }

    return mongodata.insert(
        schema=HistorySchema,
        data=data,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )
def save_processing_details(metadata, response)
Expand source code
def save_processing_details(metadata, response):

    data = {
        "tenant_id": metadata["tenant_id"],
        "event_id": metadata["event_id"],
        "app_id": metadata["app_id"],
        "uploader_id": metadata["uploader_id"],
        "updated_at": datetime.datetime.utcnow().isoformat(),
        "processing_details": [{
            "step": metadata['step'],
            "filepath": metadata["filepath"],
            "status": response["status"],
            "success": response["success"],
            "error": response["error"],
            "traceback": response["traceback"],
            "callable": metadata['callable'],
            "source": metadata['source'],
            "file_name": metadata['file_name'],
            "updated_at": datetime.datetime.utcnow().isoformat()
        }]
    }

    return mongodata.update(
        schema=HistorySchema,
        match={
            "tenant_id": metadata["tenant_id"],
            "event_id": metadata["event_id"]
        },
        new_data=data,
        append=True,
        collection=envs.MONGO_COLLECTION_HISTORY_NAME
    )
def save_step(metadata, response, on_success_save:  = None, on_failure_save:  = None, raised_error: bool = False)
Expand source code
def save_step(
        metadata,
        response,
        on_success_save: any = None,
        on_failure_save: any = None,
        raised_error: bool = False
):
    # We can't track files without an event_id
    if metadata['event_id'] is None: return

    if metadata['callable'] == 'validate_filenames':
        return save_filename_validation(metadata, response[0])

    if metadata['callable'] == 'upload_files':
        return save_file_content_validation(metadata, response[0])

    # Success cases
    if not raised_error and on_success_save:
        return save_processing_details(metadata, {
            "status": states.SUCCESS,
            "success": on_success_save,
            "error": None,
            "traceback": None
        })
    if not raised_error and not on_success_save:
        return save_processing_details(metadata, {
            "status": states.SUCCESS,
            "success": None,
            "error": None,
            "traceback": None
        })

    # Failed cases
    if raised_error and on_failure_save:
        return save_processing_details(metadata, {
            "status": states.FAILED,
            "success": None,
            "error": on_failure_save,
            "traceback": response['traceback']
        })

    if raised_error and not on_failure_save:
        return save_processing_details(metadata, {
            "status": states.FAILED,
            "success": None,
            "error": response['error'],
            "traceback": response['traceback']
        })