Module licenseware.uploader_encryptor.uploader_encryptor

Expand source code
import os
import re
import shutil
from typing import List, Dict
import pandas as pd
from licenseware.utils.aes import AESCipher
from licenseware.common.constants import envs


class UploaderEncryptor:

    """

    This class will use AES encryption algorithm to encrypt files given.
    Each parameter receives either a simple string which will be encrypted or a regex string.

    The regex string needs to be grouped: 
    Ex: 
    ```
        Input: "/Collection-deviceName"
        filepaths: ["Collection-(.+?)"] will return this: "/Collection-iGSS0a9GfPDJ24ni3vfmRSrPIYdY3kFj4-EsRjfz9E0="
    ```

    Params:

    filepaths = ["Collection-(.+?)", "DeviceName"] # this will encrypt the filepath including the filename
    filecontent = ["ODBName(.+?)"] # encrypt text found with regex from file content (csv, txt, xml) replace all
    columns = ["OS", "IP"] # encrypt the entire column data where the specified columns are found (csv, excel with multiple sheets)
    encryption_password = "secret password" # the password that will be used for encrypting and decrypting data
    You can also set password later with `set_password("password")`

    Usage:

    ```py

    from licenseware.uploader_encryptor import UploaderEncrytor

    filepaths = [
        "test_files/RVTools.xlsx",
        "test_files/LMS_OPTIONS_SECRET.csv",
        "test_files/cpuq.txt",
        "test_files/rl/deviceName_database_version.csv",
        "test_files/rl/deviceName_database_options.csv",
        "test_files/rl/deviceName_database_dba_feature.csv",
    ]

    ue = UploaderEncryptor(
        filepaths=["deviceName", "database", "LMS_OPTIONS_(.*?).csv", "rl"],
        filecontent=["Machine Name=(.+)", "System IP Address 1=(.+)"],
        columns=["DB_NAME", "MACHINE_ID", "HOST_NAME", "INSTANCE_NAME", "Host", "Device"]
    )

    ue.set_password("password")

    # Encrypt
    encrypted_filepaths = ue.get_encrypted_filepaths(filepaths)

    # Decrypt
    decrypted_filepaths = ue.get_decrypted_filepaths(encrypted_filepaths)

    ```

    """

    def __init__(
        self,
        filepaths: List[str] = None,
        filecontent: List[str] = None,
        columns: List[str] = None,
        encryption_password: str = None
    ):

        self.filepaths = filepaths or []
        self.filecontent = filecontent or []
        self.columns = columns or []
        self.encryption_password = encryption_password or "password"
        self.encryption_parameters = self.get_encryption_parameters()
        self.start_tag = "#sc#"
        self.end_tag = "#ec#"
        self.store = None
        

    def set_password(self, password: str):
        self.encryption_password = password

    def add_tags(self, value: str):
        return self.start_tag + value  + self.end_tag

    def rem_tags(self, value: str):
        no_tags_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), value)
        return no_tags_values

    def encrypt(self, value:str):
        return self.add_tags(AESCipher(self.encryption_password).encrypt(str(value)))

    def decrypt(self, encrypted_value:str):   
        
        if not isinstance(encrypted_value, str): 
            return encrypted_value
        
        if encrypted_value.startswith(self.start_tag) and encrypted_value.endswith(self.end_tag):
            encrypted_value = self.rem_tags(encrypted_value)[0]
        if not encrypted_value.endswith("="):
            encrypted_value = encrypted_value + "="
        
        return AESCipher(self.encryption_password).decrypt(encrypted_value)

    def get_encryption_parameters(self):
        return dict(
            filepaths = self.filepaths,
            filecontent = self.filecontent,
            columns = self.columns
        )

    def encrypt_filepath(self, filepath: str):

        if self.store is None: 
            self.store = {}

        encfp = filepath
        for regexpr in self.filepaths:
            values_to_encrypt = set(re.findall(re.compile(regexpr), encfp))            
            for val in values_to_encrypt:
                
                if val not in self.store:

                    encrypted = self.encrypt(val)
                    encfp = encfp.replace(val, encrypted)

                    self.store[val] = {
                        'regexpr': regexpr,
                        'not_encrypted': val,
                        'encrypted': encrypted
                    }

                else:
                    encfp = encfp.replace(val,  self.store[val]['encrypted'])

        return encfp

    def decrypt_filepath(self, filepath: str):

        enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), filepath)

        encdec = {}
        for encval in enc_values:
            encdec[self.add_tags(encval)] = self.decrypt(encval)
            
        decfp = filepath
        for enc, dec in encdec.items():
            decfp = decfp.replace(enc, dec)

        return decfp

    def mirror_dirs(self, filepaths_dict: Dict[str, str], enctype: str) -> List[str]:

        assert enctype in ["encrypt", "decrypt"]

        processed_filepaths = []
        for sourcepath, destinationpath in filepaths_dict.items():

            dstdir = os.path.join(envs.FILE_UPLOAD_PATH, f"{enctype}ed")

            dstpathli = [dstdir] + destinationpath.replace(envs.FILE_UPLOAD_PATH, "").split(os.path.sep)[1:]
            dstpath = os.path.join(*dstpathli)

            root_path = os.path.dirname(dstpath)
            if not os.path.exists(root_path):
                os.makedirs(root_path)

            shutil.copy2(sourcepath, dstpath)
            processed_filepaths.append(dstpath)

        return processed_filepaths


    def get_src_dst_files(self, filepaths: List[str], enctype: str):

        assert enctype in ["encrypt", "decrypt"]

        self.store = {}

        encdecfunc = lambda fp: self.encrypt_filepath(fp) if enctype == "encrypt" else self.decrypt_filepath(fp)

        filepaths_dict = {}
        for fp in filepaths:
            if not os.path.exists(fp): continue
            filepaths_dict[fp] = encdecfunc(fp)

        self.store = None

        return filepaths_dict


    def get_encrypted_filepaths(self, filepaths: List[str]):

        encrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'encrypt')
        encrypted_filepaths_list = self.mirror_dirs(encrypted_filepaths_dict, 'encrypt')
        self.encrypt_filecontent(encrypted_filepaths_list)

        return encrypted_filepaths_list


    def get_decrypted_filepaths(self, filepaths: List[str]):

        decrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'decrypt')
        decrypted_filepaths_list = self.mirror_dirs(decrypted_filepaths_dict, 'decrypt')
        self.decrypt_filecontent(decrypted_filepaths_list)

        return decrypted_filepaths_list


    def encrypt_non_excel_filecontent(self, filepath: str):

        with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
            content = f.read()

        to_encrypt_values = set()
        for regexp in self.filecontent:
            matches = re.findall(re.compile(regexp), content)
            if not matches: continue
            to_encrypt_values.add(*matches)
            
        encryption_dict = {}
        for tev in to_encrypt_values:
            encryption_dict[tev] = self.encrypt(tev)

        for val, encval in encryption_dict.items():
            content = re.sub(val, encval, content)

        with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
            f.write(content)

        return content


    def decrypt_non_excel_filecontent(self, filepath: str):

        with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
            content = f.read()

        enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), content)

        encdec = {}
        for encval in enc_values:
            encdec[self.add_tags(encval)] = self.decrypt(encval)

        for encval, val in encdec.items():
            content = re.sub(encval, val, content)

        with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
            f.write(content)

        return content

    def decrypt_excel_filecontent(self, filepath: str):

        decrypted_dfs = {}
        excel = pd.ExcelFile(filepath)
        for sheet_name in excel.sheet_names:
            
            df = pd.read_excel(excel, sheet_name) 
            
            for col in self.columns:
                if col not in df.columns: continue
                
                decryption_dict = {}
                for val in df[col].unique():
                    decryption_dict[val] = self.decrypt(val)
                    
                df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

            decrypted_dfs[sheet_name] = df

        writer = pd.ExcelWriter(filepath)
        for name, df in decrypted_dfs.items():
            df.to_excel(writer, sheet_name=name, index=False)
        writer.save()
        writer.close()

        return filepath

    def decrypt_csv_filecontent(self, filepath: str):

        # TODO - handle big csv's
        df = pd.read_csv(filepath) 

        for col in self.columns:
            if col not in df.columns: continue
            
            decryption_dict = {}
            for val in df[col].unique():
                decryption_dict[val] = self.decrypt(val)
                
            df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

        df.to_csv(filepath, index=False)

        return filepath


    def encrypt_excel_filecontent(self, filepath: str):

        encrypted_dfs = {}
        excel = pd.ExcelFile(filepath)
        for sheet_name in excel.sheet_names:
            
            df = pd.read_excel(excel, sheet_name) 
            
            for col in self.columns:
                if col not in df.columns: continue
                
                encryption_dict = {}
                for val in df[col].unique():
                    encryption_dict[val] = self.encrypt(val)

                df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

            encrypted_dfs[sheet_name] = df

        writer = pd.ExcelWriter(filepath)
        for name, df in encrypted_dfs.items():
            df.to_excel(writer, sheet_name=name, index=False)
        writer.save()
        writer.close()

        return filepath


    def encrypt_csv_filecontent(self,  filepath: str):
        
        # TODO - handle big csv's
        df = pd.read_csv(filepath) 

        for col in self.columns:
            if col not in df.columns: continue
            
            encryption_dict = {}
            for val in df[col].unique():
                encryption_dict[val] = self.encrypt(val)
                
            df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

        df.to_csv(filepath, index=False)

        return filepath



    def encrypt_filecontent(self, filepaths: List[str]):

        for fp in filepaths:
            if fp.endswith(('.txt', '.xml', '.csv', )):
                self.encrypt_non_excel_filecontent(fp)
            elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
                self.encrypt_excel_filecontent(fp)
            elif fp.endswith('.csv') and self.columns:
                self.encrypt_csv_filecontent(fp)

        return filepaths


            
    def decrypt_filecontent(self, filepaths: List[str]):

        for fp in filepaths:
            if fp.endswith(('.txt', '.xml', '.csv', )):
                self.decrypt_non_excel_filecontent(fp)
            elif fp.endswith('.csv') and self.columns:
                self.decrypt_csv_filecontent(fp)
            elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
                self.decrypt_excel_filecontent(fp)

        return filepaths

Classes

class UploaderEncryptor (filepaths: List[str] = None, filecontent: List[str] = None, columns: List[str] = None, encryption_password: str = None)

This class will use AES encryption algorithm to encrypt files given. Each parameter receives either a simple string which will be encrypted or a regex string.

The regex string needs to be grouped: Ex:

    Input: "/Collection-deviceName"
    filepaths: ["Collection-(.+?)"] will return this: "/Collection-iGSS0a9GfPDJ24ni3vfmRSrPIYdY3kFj4-EsRjfz9E0="

Params:

filepaths = ["Collection-(.+?)", "DeviceName"] # this will encrypt the filepath including the filename filecontent = ["ODBName(.+?)"] # encrypt text found with regex from file content (csv, txt, xml) replace all columns = ["OS", "IP"] # encrypt the entire column data where the specified columns are found (csv, excel with multiple sheets) encryption_password = "secret password" # the password that will be used for encrypting and decrypting data You can also set password later with set_password("password")

Usage:


from licenseware.uploader_encryptor import UploaderEncrytor

filepaths = [
    "test_files/RVTools.xlsx",
    "test_files/LMS_OPTIONS_SECRET.csv",
    "test_files/cpuq.txt",
    "test_files/rl/deviceName_database_version.csv",
    "test_files/rl/deviceName_database_options.csv",
    "test_files/rl/deviceName_database_dba_feature.csv",
]

ue = UploaderEncryptor(
    filepaths=["deviceName", "database", "LMS_OPTIONS_(.*?).csv", "rl"],
    filecontent=["Machine Name=(.+)", "System IP Address 1=(.+)"],
    columns=["DB_NAME", "MACHINE_ID", "HOST_NAME", "INSTANCE_NAME", "Host", "Device"]
)

ue.set_password("password")

# Encrypt
encrypted_filepaths = ue.get_encrypted_filepaths(filepaths)

# Decrypt
decrypted_filepaths = ue.get_decrypted_filepaths(encrypted_filepaths)

Expand source code
class UploaderEncryptor:

    """

    This class will use AES encryption algorithm to encrypt files given.
    Each parameter receives either a simple string which will be encrypted or a regex string.

    The regex string needs to be grouped: 
    Ex: 
    ```
        Input: "/Collection-deviceName"
        filepaths: ["Collection-(.+?)"] will return this: "/Collection-iGSS0a9GfPDJ24ni3vfmRSrPIYdY3kFj4-EsRjfz9E0="
    ```

    Params:

    filepaths = ["Collection-(.+?)", "DeviceName"] # this will encrypt the filepath including the filename
    filecontent = ["ODBName(.+?)"] # encrypt text found with regex from file content (csv, txt, xml) replace all
    columns = ["OS", "IP"] # encrypt the entire column data where the specified columns are found (csv, excel with multiple sheets)
    encryption_password = "secret password" # the password that will be used for encrypting and decrypting data
    You can also set password later with `set_password("password")`

    Usage:

    ```py

    from licenseware.uploader_encryptor import UploaderEncrytor

    filepaths = [
        "test_files/RVTools.xlsx",
        "test_files/LMS_OPTIONS_SECRET.csv",
        "test_files/cpuq.txt",
        "test_files/rl/deviceName_database_version.csv",
        "test_files/rl/deviceName_database_options.csv",
        "test_files/rl/deviceName_database_dba_feature.csv",
    ]

    ue = UploaderEncryptor(
        filepaths=["deviceName", "database", "LMS_OPTIONS_(.*?).csv", "rl"],
        filecontent=["Machine Name=(.+)", "System IP Address 1=(.+)"],
        columns=["DB_NAME", "MACHINE_ID", "HOST_NAME", "INSTANCE_NAME", "Host", "Device"]
    )

    ue.set_password("password")

    # Encrypt
    encrypted_filepaths = ue.get_encrypted_filepaths(filepaths)

    # Decrypt
    decrypted_filepaths = ue.get_decrypted_filepaths(encrypted_filepaths)

    ```

    """

    def __init__(
        self,
        filepaths: List[str] = None,
        filecontent: List[str] = None,
        columns: List[str] = None,
        encryption_password: str = None
    ):

        self.filepaths = filepaths or []
        self.filecontent = filecontent or []
        self.columns = columns or []
        self.encryption_password = encryption_password or "password"
        self.encryption_parameters = self.get_encryption_parameters()
        self.start_tag = "#sc#"
        self.end_tag = "#ec#"
        self.store = None
        

    def set_password(self, password: str):
        self.encryption_password = password

    def add_tags(self, value: str):
        return self.start_tag + value  + self.end_tag

    def rem_tags(self, value: str):
        no_tags_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), value)
        return no_tags_values

    def encrypt(self, value:str):
        return self.add_tags(AESCipher(self.encryption_password).encrypt(str(value)))

    def decrypt(self, encrypted_value:str):   
        
        if not isinstance(encrypted_value, str): 
            return encrypted_value
        
        if encrypted_value.startswith(self.start_tag) and encrypted_value.endswith(self.end_tag):
            encrypted_value = self.rem_tags(encrypted_value)[0]
        if not encrypted_value.endswith("="):
            encrypted_value = encrypted_value + "="
        
        return AESCipher(self.encryption_password).decrypt(encrypted_value)

    def get_encryption_parameters(self):
        return dict(
            filepaths = self.filepaths,
            filecontent = self.filecontent,
            columns = self.columns
        )

    def encrypt_filepath(self, filepath: str):

        if self.store is None: 
            self.store = {}

        encfp = filepath
        for regexpr in self.filepaths:
            values_to_encrypt = set(re.findall(re.compile(regexpr), encfp))            
            for val in values_to_encrypt:
                
                if val not in self.store:

                    encrypted = self.encrypt(val)
                    encfp = encfp.replace(val, encrypted)

                    self.store[val] = {
                        'regexpr': regexpr,
                        'not_encrypted': val,
                        'encrypted': encrypted
                    }

                else:
                    encfp = encfp.replace(val,  self.store[val]['encrypted'])

        return encfp

    def decrypt_filepath(self, filepath: str):

        enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), filepath)

        encdec = {}
        for encval in enc_values:
            encdec[self.add_tags(encval)] = self.decrypt(encval)
            
        decfp = filepath
        for enc, dec in encdec.items():
            decfp = decfp.replace(enc, dec)

        return decfp

    def mirror_dirs(self, filepaths_dict: Dict[str, str], enctype: str) -> List[str]:

        assert enctype in ["encrypt", "decrypt"]

        processed_filepaths = []
        for sourcepath, destinationpath in filepaths_dict.items():

            dstdir = os.path.join(envs.FILE_UPLOAD_PATH, f"{enctype}ed")

            dstpathli = [dstdir] + destinationpath.replace(envs.FILE_UPLOAD_PATH, "").split(os.path.sep)[1:]
            dstpath = os.path.join(*dstpathli)

            root_path = os.path.dirname(dstpath)
            if not os.path.exists(root_path):
                os.makedirs(root_path)

            shutil.copy2(sourcepath, dstpath)
            processed_filepaths.append(dstpath)

        return processed_filepaths


    def get_src_dst_files(self, filepaths: List[str], enctype: str):

        assert enctype in ["encrypt", "decrypt"]

        self.store = {}

        encdecfunc = lambda fp: self.encrypt_filepath(fp) if enctype == "encrypt" else self.decrypt_filepath(fp)

        filepaths_dict = {}
        for fp in filepaths:
            if not os.path.exists(fp): continue
            filepaths_dict[fp] = encdecfunc(fp)

        self.store = None

        return filepaths_dict


    def get_encrypted_filepaths(self, filepaths: List[str]):

        encrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'encrypt')
        encrypted_filepaths_list = self.mirror_dirs(encrypted_filepaths_dict, 'encrypt')
        self.encrypt_filecontent(encrypted_filepaths_list)

        return encrypted_filepaths_list


    def get_decrypted_filepaths(self, filepaths: List[str]):

        decrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'decrypt')
        decrypted_filepaths_list = self.mirror_dirs(decrypted_filepaths_dict, 'decrypt')
        self.decrypt_filecontent(decrypted_filepaths_list)

        return decrypted_filepaths_list


    def encrypt_non_excel_filecontent(self, filepath: str):

        with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
            content = f.read()

        to_encrypt_values = set()
        for regexp in self.filecontent:
            matches = re.findall(re.compile(regexp), content)
            if not matches: continue
            to_encrypt_values.add(*matches)
            
        encryption_dict = {}
        for tev in to_encrypt_values:
            encryption_dict[tev] = self.encrypt(tev)

        for val, encval in encryption_dict.items():
            content = re.sub(val, encval, content)

        with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
            f.write(content)

        return content


    def decrypt_non_excel_filecontent(self, filepath: str):

        with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
            content = f.read()

        enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), content)

        encdec = {}
        for encval in enc_values:
            encdec[self.add_tags(encval)] = self.decrypt(encval)

        for encval, val in encdec.items():
            content = re.sub(encval, val, content)

        with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
            f.write(content)

        return content

    def decrypt_excel_filecontent(self, filepath: str):

        decrypted_dfs = {}
        excel = pd.ExcelFile(filepath)
        for sheet_name in excel.sheet_names:
            
            df = pd.read_excel(excel, sheet_name) 
            
            for col in self.columns:
                if col not in df.columns: continue
                
                decryption_dict = {}
                for val in df[col].unique():
                    decryption_dict[val] = self.decrypt(val)
                    
                df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

            decrypted_dfs[sheet_name] = df

        writer = pd.ExcelWriter(filepath)
        for name, df in decrypted_dfs.items():
            df.to_excel(writer, sheet_name=name, index=False)
        writer.save()
        writer.close()

        return filepath

    def decrypt_csv_filecontent(self, filepath: str):

        # TODO - handle big csv's
        df = pd.read_csv(filepath) 

        for col in self.columns:
            if col not in df.columns: continue
            
            decryption_dict = {}
            for val in df[col].unique():
                decryption_dict[val] = self.decrypt(val)
                
            df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

        df.to_csv(filepath, index=False)

        return filepath


    def encrypt_excel_filecontent(self, filepath: str):

        encrypted_dfs = {}
        excel = pd.ExcelFile(filepath)
        for sheet_name in excel.sheet_names:
            
            df = pd.read_excel(excel, sheet_name) 
            
            for col in self.columns:
                if col not in df.columns: continue
                
                encryption_dict = {}
                for val in df[col].unique():
                    encryption_dict[val] = self.encrypt(val)

                df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

            encrypted_dfs[sheet_name] = df

        writer = pd.ExcelWriter(filepath)
        for name, df in encrypted_dfs.items():
            df.to_excel(writer, sheet_name=name, index=False)
        writer.save()
        writer.close()

        return filepath


    def encrypt_csv_filecontent(self,  filepath: str):
        
        # TODO - handle big csv's
        df = pd.read_csv(filepath) 

        for col in self.columns:
            if col not in df.columns: continue
            
            encryption_dict = {}
            for val in df[col].unique():
                encryption_dict[val] = self.encrypt(val)
                
            df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

        df.to_csv(filepath, index=False)

        return filepath



    def encrypt_filecontent(self, filepaths: List[str]):

        for fp in filepaths:
            if fp.endswith(('.txt', '.xml', '.csv', )):
                self.encrypt_non_excel_filecontent(fp)
            elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
                self.encrypt_excel_filecontent(fp)
            elif fp.endswith('.csv') and self.columns:
                self.encrypt_csv_filecontent(fp)

        return filepaths


            
    def decrypt_filecontent(self, filepaths: List[str]):

        for fp in filepaths:
            if fp.endswith(('.txt', '.xml', '.csv', )):
                self.decrypt_non_excel_filecontent(fp)
            elif fp.endswith('.csv') and self.columns:
                self.decrypt_csv_filecontent(fp)
            elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
                self.decrypt_excel_filecontent(fp)

        return filepaths

Methods

def add_tags(self, value: str)
Expand source code
def add_tags(self, value: str):
    return self.start_tag + value  + self.end_tag
def decrypt(self, encrypted_value: str)
Expand source code
def decrypt(self, encrypted_value:str):   
    
    if not isinstance(encrypted_value, str): 
        return encrypted_value
    
    if encrypted_value.startswith(self.start_tag) and encrypted_value.endswith(self.end_tag):
        encrypted_value = self.rem_tags(encrypted_value)[0]
    if not encrypted_value.endswith("="):
        encrypted_value = encrypted_value + "="
    
    return AESCipher(self.encryption_password).decrypt(encrypted_value)
def decrypt_csv_filecontent(self, filepath: str)
Expand source code
def decrypt_csv_filecontent(self, filepath: str):

    # TODO - handle big csv's
    df = pd.read_csv(filepath) 

    for col in self.columns:
        if col not in df.columns: continue
        
        decryption_dict = {}
        for val in df[col].unique():
            decryption_dict[val] = self.decrypt(val)
            
        df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

    df.to_csv(filepath, index=False)

    return filepath
def decrypt_excel_filecontent(self, filepath: str)
Expand source code
def decrypt_excel_filecontent(self, filepath: str):

    decrypted_dfs = {}
    excel = pd.ExcelFile(filepath)
    for sheet_name in excel.sheet_names:
        
        df = pd.read_excel(excel, sheet_name) 
        
        for col in self.columns:
            if col not in df.columns: continue
            
            decryption_dict = {}
            for val in df[col].unique():
                decryption_dict[val] = self.decrypt(val)
                
            df[col] = df[col].apply(lambda cell: decryption_dict[cell] if cell in decryption_dict else cell)

        decrypted_dfs[sheet_name] = df

    writer = pd.ExcelWriter(filepath)
    for name, df in decrypted_dfs.items():
        df.to_excel(writer, sheet_name=name, index=False)
    writer.save()
    writer.close()

    return filepath
def decrypt_filecontent(self, filepaths: List[str])
Expand source code
def decrypt_filecontent(self, filepaths: List[str]):

    for fp in filepaths:
        if fp.endswith(('.txt', '.xml', '.csv', )):
            self.decrypt_non_excel_filecontent(fp)
        elif fp.endswith('.csv') and self.columns:
            self.decrypt_csv_filecontent(fp)
        elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
            self.decrypt_excel_filecontent(fp)

    return filepaths
def decrypt_filepath(self, filepath: str)
Expand source code
def decrypt_filepath(self, filepath: str):

    enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), filepath)

    encdec = {}
    for encval in enc_values:
        encdec[self.add_tags(encval)] = self.decrypt(encval)
        
    decfp = filepath
    for enc, dec in encdec.items():
        decfp = decfp.replace(enc, dec)

    return decfp
def decrypt_non_excel_filecontent(self, filepath: str)
Expand source code
def decrypt_non_excel_filecontent(self, filepath: str):

    with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
        content = f.read()

    enc_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), content)

    encdec = {}
    for encval in enc_values:
        encdec[self.add_tags(encval)] = self.decrypt(encval)

    for encval, val in encdec.items():
        content = re.sub(encval, val, content)

    with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
        f.write(content)

    return content
def encrypt(self, value: str)
Expand source code
def encrypt(self, value:str):
    return self.add_tags(AESCipher(self.encryption_password).encrypt(str(value)))
def encrypt_csv_filecontent(self, filepath: str)
Expand source code
def encrypt_csv_filecontent(self,  filepath: str):
    
    # TODO - handle big csv's
    df = pd.read_csv(filepath) 

    for col in self.columns:
        if col not in df.columns: continue
        
        encryption_dict = {}
        for val in df[col].unique():
            encryption_dict[val] = self.encrypt(val)
            
        df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

    df.to_csv(filepath, index=False)

    return filepath
def encrypt_excel_filecontent(self, filepath: str)
Expand source code
def encrypt_excel_filecontent(self, filepath: str):

    encrypted_dfs = {}
    excel = pd.ExcelFile(filepath)
    for sheet_name in excel.sheet_names:
        
        df = pd.read_excel(excel, sheet_name) 
        
        for col in self.columns:
            if col not in df.columns: continue
            
            encryption_dict = {}
            for val in df[col].unique():
                encryption_dict[val] = self.encrypt(val)

            df[col] = df[col].apply(lambda cell: encryption_dict[cell] if cell in encryption_dict else cell)

        encrypted_dfs[sheet_name] = df

    writer = pd.ExcelWriter(filepath)
    for name, df in encrypted_dfs.items():
        df.to_excel(writer, sheet_name=name, index=False)
    writer.save()
    writer.close()

    return filepath
def encrypt_filecontent(self, filepaths: List[str])
Expand source code
def encrypt_filecontent(self, filepaths: List[str]):

    for fp in filepaths:
        if fp.endswith(('.txt', '.xml', '.csv', )):
            self.encrypt_non_excel_filecontent(fp)
        elif fp.endswith(('.xls', '.xlsx', )) and self.columns:
            self.encrypt_excel_filecontent(fp)
        elif fp.endswith('.csv') and self.columns:
            self.encrypt_csv_filecontent(fp)

    return filepaths
def encrypt_filepath(self, filepath: str)
Expand source code
def encrypt_filepath(self, filepath: str):

    if self.store is None: 
        self.store = {}

    encfp = filepath
    for regexpr in self.filepaths:
        values_to_encrypt = set(re.findall(re.compile(regexpr), encfp))            
        for val in values_to_encrypt:
            
            if val not in self.store:

                encrypted = self.encrypt(val)
                encfp = encfp.replace(val, encrypted)

                self.store[val] = {
                    'regexpr': regexpr,
                    'not_encrypted': val,
                    'encrypted': encrypted
                }

            else:
                encfp = encfp.replace(val,  self.store[val]['encrypted'])

    return encfp
def encrypt_non_excel_filecontent(self, filepath: str)
Expand source code
def encrypt_non_excel_filecontent(self, filepath: str):

    with open(filepath, 'r', encoding="utf-8", errors="ignore") as f:
        content = f.read()

    to_encrypt_values = set()
    for regexp in self.filecontent:
        matches = re.findall(re.compile(regexp), content)
        if not matches: continue
        to_encrypt_values.add(*matches)
        
    encryption_dict = {}
    for tev in to_encrypt_values:
        encryption_dict[tev] = self.encrypt(tev)

    for val, encval in encryption_dict.items():
        content = re.sub(val, encval, content)

    with open(filepath, 'w', encoding="utf-8", errors="ignore") as f:
        f.write(content)

    return content
def get_decrypted_filepaths(self, filepaths: List[str])
Expand source code
def get_decrypted_filepaths(self, filepaths: List[str]):

    decrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'decrypt')
    decrypted_filepaths_list = self.mirror_dirs(decrypted_filepaths_dict, 'decrypt')
    self.decrypt_filecontent(decrypted_filepaths_list)

    return decrypted_filepaths_list
def get_encrypted_filepaths(self, filepaths: List[str])
Expand source code
def get_encrypted_filepaths(self, filepaths: List[str]):

    encrypted_filepaths_dict = self.get_src_dst_files(filepaths, 'encrypt')
    encrypted_filepaths_list = self.mirror_dirs(encrypted_filepaths_dict, 'encrypt')
    self.encrypt_filecontent(encrypted_filepaths_list)

    return encrypted_filepaths_list
def get_encryption_parameters(self)
Expand source code
def get_encryption_parameters(self):
    return dict(
        filepaths = self.filepaths,
        filecontent = self.filecontent,
        columns = self.columns
    )
def get_src_dst_files(self, filepaths: List[str], enctype: str)
Expand source code
def get_src_dst_files(self, filepaths: List[str], enctype: str):

    assert enctype in ["encrypt", "decrypt"]

    self.store = {}

    encdecfunc = lambda fp: self.encrypt_filepath(fp) if enctype == "encrypt" else self.decrypt_filepath(fp)

    filepaths_dict = {}
    for fp in filepaths:
        if not os.path.exists(fp): continue
        filepaths_dict[fp] = encdecfunc(fp)

    self.store = None

    return filepaths_dict
def mirror_dirs(self, filepaths_dict: Dict[str, str], enctype: str) ‑> List[str]
Expand source code
def mirror_dirs(self, filepaths_dict: Dict[str, str], enctype: str) -> List[str]:

    assert enctype in ["encrypt", "decrypt"]

    processed_filepaths = []
    for sourcepath, destinationpath in filepaths_dict.items():

        dstdir = os.path.join(envs.FILE_UPLOAD_PATH, f"{enctype}ed")

        dstpathli = [dstdir] + destinationpath.replace(envs.FILE_UPLOAD_PATH, "").split(os.path.sep)[1:]
        dstpath = os.path.join(*dstpathli)

        root_path = os.path.dirname(dstpath)
        if not os.path.exists(root_path):
            os.makedirs(root_path)

        shutil.copy2(sourcepath, dstpath)
        processed_filepaths.append(dstpath)

    return processed_filepaths
def rem_tags(self, value: str)
Expand source code
def rem_tags(self, value: str):
    no_tags_values = re.findall(re.compile(f"{self.start_tag}(.*?){self.end_tag}"), value)
    return no_tags_values
def set_password(self, password: str)
Expand source code
def set_password(self, password: str):
    self.encryption_password = password