Module licenseware.report_components.external_data_service

Expand source code
import requests
import traceback
import os

from licenseware.utils.logger import log

REGISTRY_SERVICE_URL = os.getenv("REGISTRY_SERVICE_URL")

class ExternalDataService:

    @staticmethod
    def _get_registry_service_data(headers: dict, endpoint: str) -> list:
        try:
            reg_data = requests.get(
                url=f"{REGISTRY_SERVICE_URL}/{endpoint}",
                headers=headers
            )
            return reg_data.json()
        except Exception:
            log.error(traceback.format_exc())
            return [{
                'data': []
            }]   


    @staticmethod
    def _get_component_url(components: dict, app_id: str, component_id: str) -> str:
        try:
            return [d['url'] for d in components['data'] if d['app_id'] == app_id and d['component_id'] == component_id][0]
        except IndexError:
            log.error(traceback.format_exc())
            return False


    @staticmethod
    def get_data(_request, app_id: str, component_id: str, filter_payload: dict=None) -> list:
        try:
            headers = {
                "TenantId": _request.headers.get("TenantId"),
                "Authorization": _request.headers.get("Authorization"),
            }
            
            registry_service_components = ExternalDataService._get_registry_service_data(headers, "components")
            service_url = ExternalDataService._get_component_url(
                components=registry_service_components,
                app_id=app_id,
                component_id=component_id
            )
            if not service_url:
                return []

            if filter_payload:
                data = requests.post(
                    url=service_url, headers=headers, json=filter_payload
                )
            else:
                data = requests.get(url=service_url, headers=headers)
                
            if data.status_code == 200:
                return data.json()
            else:
                log.warning(f"Could not retrieve data for {component_id} from {app_id}")
                log.warning(f"GET {service_url} {data.status_code}")
                return []
        except Exception:
            log.error(traceback.format_exc())
            return False


    @staticmethod
    def _get_uploader_url(uploaders: dict, app_id: str, uploader_id: str) -> str:
        try:
            return [d['upload_url'] for d in uploaders['data'] if d['app_id'] == app_id and d['uploader_id'] == uploader_id][0]
        except IndexError:
            log.error(traceback.format_exc())
            return False
    

    @staticmethod
    def get_upload_url(_request: dict, app_id: str, uploader_id: str) -> str:
        headers = {
            "TenantId": _request.get("Tenantid"),
            "Authorization": _request.get("Authorization"),
        }
        registry_service_uploaders = ExternalDataService._get_registry_service_data(headers, "uploaders")

        upload_url = ExternalDataService._get_uploader_url(
            uploaders=registry_service_uploaders, 
            app_id=app_id, 
            uploader_id=uploader_id
            )
        
        if not upload_url:
            return None
        
        return upload_url

Classes

class ExternalDataService
Expand source code
class ExternalDataService:

    @staticmethod
    def _get_registry_service_data(headers: dict, endpoint: str) -> list:
        try:
            reg_data = requests.get(
                url=f"{REGISTRY_SERVICE_URL}/{endpoint}",
                headers=headers
            )
            return reg_data.json()
        except Exception:
            log.error(traceback.format_exc())
            return [{
                'data': []
            }]   


    @staticmethod
    def _get_component_url(components: dict, app_id: str, component_id: str) -> str:
        try:
            return [d['url'] for d in components['data'] if d['app_id'] == app_id and d['component_id'] == component_id][0]
        except IndexError:
            log.error(traceback.format_exc())
            return False


    @staticmethod
    def get_data(_request, app_id: str, component_id: str, filter_payload: dict=None) -> list:
        try:
            headers = {
                "TenantId": _request.headers.get("TenantId"),
                "Authorization": _request.headers.get("Authorization"),
            }
            
            registry_service_components = ExternalDataService._get_registry_service_data(headers, "components")
            service_url = ExternalDataService._get_component_url(
                components=registry_service_components,
                app_id=app_id,
                component_id=component_id
            )
            if not service_url:
                return []

            if filter_payload:
                data = requests.post(
                    url=service_url, headers=headers, json=filter_payload
                )
            else:
                data = requests.get(url=service_url, headers=headers)
                
            if data.status_code == 200:
                return data.json()
            else:
                log.warning(f"Could not retrieve data for {component_id} from {app_id}")
                log.warning(f"GET {service_url} {data.status_code}")
                return []
        except Exception:
            log.error(traceback.format_exc())
            return False


    @staticmethod
    def _get_uploader_url(uploaders: dict, app_id: str, uploader_id: str) -> str:
        try:
            return [d['upload_url'] for d in uploaders['data'] if d['app_id'] == app_id and d['uploader_id'] == uploader_id][0]
        except IndexError:
            log.error(traceback.format_exc())
            return False
    

    @staticmethod
    def get_upload_url(_request: dict, app_id: str, uploader_id: str) -> str:
        headers = {
            "TenantId": _request.get("Tenantid"),
            "Authorization": _request.get("Authorization"),
        }
        registry_service_uploaders = ExternalDataService._get_registry_service_data(headers, "uploaders")

        upload_url = ExternalDataService._get_uploader_url(
            uploaders=registry_service_uploaders, 
            app_id=app_id, 
            uploader_id=uploader_id
            )
        
        if not upload_url:
            return None
        
        return upload_url

Static methods

def get_data(_request, app_id: str, component_id: str, filter_payload: dict = None) ‑> list
Expand source code
@staticmethod
def get_data(_request, app_id: str, component_id: str, filter_payload: dict=None) -> list:
    try:
        headers = {
            "TenantId": _request.headers.get("TenantId"),
            "Authorization": _request.headers.get("Authorization"),
        }
        
        registry_service_components = ExternalDataService._get_registry_service_data(headers, "components")
        service_url = ExternalDataService._get_component_url(
            components=registry_service_components,
            app_id=app_id,
            component_id=component_id
        )
        if not service_url:
            return []

        if filter_payload:
            data = requests.post(
                url=service_url, headers=headers, json=filter_payload
            )
        else:
            data = requests.get(url=service_url, headers=headers)
            
        if data.status_code == 200:
            return data.json()
        else:
            log.warning(f"Could not retrieve data for {component_id} from {app_id}")
            log.warning(f"GET {service_url} {data.status_code}")
            return []
    except Exception:
        log.error(traceback.format_exc())
        return False
def get_upload_url(_request: dict, app_id: str, uploader_id: str) ‑> str
Expand source code
@staticmethod
def get_upload_url(_request: dict, app_id: str, uploader_id: str) -> str:
    headers = {
        "TenantId": _request.get("Tenantid"),
        "Authorization": _request.get("Authorization"),
    }
    registry_service_uploaders = ExternalDataService._get_registry_service_data(headers, "uploaders")

    upload_url = ExternalDataService._get_uploader_url(
        uploaders=registry_service_uploaders, 
        app_id=app_id, 
        uploader_id=uploader_id
        )
    
    if not upload_url:
        return None
    
    return upload_url