Module licenseware.report_snapshot.report_snapshot

Expand source code
from flask import Request
from datetime import datetime
from licenseware import mongodata
from licenseware.common.constants import envs
from licenseware.utils.logger import log
from marshmallow import Schema, fields, INCLUDE
from pymongo import ASCENDING
import time

#! OUTDATED - pagination needs to be implemented


class AllowAllSchema(Schema):
    class Meta:
        unknown = INCLUDE

    report_snapshot_date = fields.Str(required=True)


class ReportSnapshot:
    def __init__(self, report: type, flask_request: Request):
        self.report = report
        self.request = flask_request
        self.tenant_id = flask_request.headers.get("Tenantid")

    def add_report_components_data(self):

        report_metadata, status_code = self.report.return_json_payload()

        rcomponents = []
        for comp in self.report.components:
            comp_payload = comp.get_registration_payload()
            comp_payload["component_data"] = comp.get_data(self.request)
            rcomponents.append(comp_payload)

        report_metadata["report_components"] = rcomponents

        return report_metadata

    def generate_snapshot(self):

        report_data = self.add_report_components_data()

        report_data["tenant_id"] = self.tenant_id
        report_data["report_snapshot_date"] = datetime.utcnow().isoformat()

        mongodata.insert(
            schema=AllowAllSchema,
            collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME,
            data=report_data,
        )

        return report_data

    # def get_latest_snapshot(self):
    #
    #     start = time.time()
    #
    #     # Getting latest report snapshot
    #     snapshots_collection = mongodata.get_collection(envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME)
    #
    #     latest_snapshot = snapshots_collection.find(
    #         {
    #             'tenant_id': self.tenant_id,
    #             'report_id': self.report.report_id
    #         }
    #     ).sort("report_snapshot_date", ASCENDING).limit(1)
    #
    #     latest_snapshot = list(latest_snapshot)
    #
    #     snapshot_outdated = True
    #     if latest_snapshot:
    #         latest_snapshot = latest_snapshot[0]
    #         latest_snapshot.pop('_id')  # ObjectId is not serializable
    #
    #         counted = mongodata.document_count(
    #             match={
    #                 'tenant_id': self.tenant_id,
    #                 'updated_at': {'$gt': latest_snapshot['report_snapshot_date']}
    #             },
    #             collection=envs.MONGO_COLLECTION_ANALYSIS_NAME
    #         )
    #
    #         snapshot_outdated = True if counted > 0 else False
    #
    #     if snapshot_outdated:
    #         log.info("Returning generated report snapshot")
    #         report_data = self.generate_snapshot()
    #
    #         end = time.time()
    #         log.warning(f"Time elapsed: {end - start}")
    #         # 0.6979179382324219 ~ 0.70
    #         return report_data
    #
    #     end = time.time()
    #     log.warning(f"Time elapsed: {end - start}")
    #     # 0.02915167808532715 ~ 0.030
    #     log.info("Returning latest report snapshot")
    #     return latest_snapshot

    def get_report_data(self):

        report_data = self.generate_snapshot()

        return report_data

Classes

class AllowAllSchema (*, only: Union[Sequence[str], Set[str], None] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict[~KT, ~VT]] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)

Base schema class with which to define custom schemas.

Example usage:

.. code-block:: python

import datetime as dt
from dataclasses import dataclass

from marshmallow import Schema, fields


@dataclass
class Album:
    title: str
    release_date: dt.date


class AlbumSchema(Schema):
    title = fields.Str()
    release_date = fields.Date()


album = Album("Beggars Banquet", dt.date(1968, 12, 6))
schema = AlbumSchema()
data = schema.dump(album)
data  # {'release_date': '1968-12-06', 'title': 'Beggars Banquet'}

:param only: Whitelist of the declared fields to select when instantiating the Schema. If None, all fields are used. Nested fields can be represented with dot delimiters. :param exclude: Blacklist of the declared fields to exclude when instantiating the Schema. If a field appears in both only and exclude, it is not used. Nested fields can be represented with dot delimiters. :param many: Should be set to True if obj is a collection so that the object will be serialized to a list. :param context: Optional context passed to :class:fields.Method and :class:fields.Function fields. :param load_only: Fields to skip during serialization (write-only fields) :param dump_only: Fields to skip during deserialization (read-only fields) :param partial: Whether to ignore missing fields and not require any fields declared. Propagates down to Nested fields as well. If its value is an iterable, only missing fields listed in that iterable will be ignored. Use dot delimiters to specify nested fields. :param unknown: Whether to exclude, include, or raise an error for unknown fields in the data. Use EXCLUDE, INCLUDE or RAISE.

Changed in version: 3.0.0

prefix parameter removed.

Changed in version: 2.0.0

__validators__, __preprocessors__, and __data_handlers__ are removed in favor of marshmallow.decorators.validates_schema, marshmallow.decorators.pre_load and marshmallow.decorators.post_dump. __accessor__ and __error_handler__ are deprecated. Implement the handle_error and get_attribute methods instead.

Expand source code
class AllowAllSchema(Schema):
    class Meta:
        unknown = INCLUDE

    report_snapshot_date = fields.Str(required=True)

Ancestors

  • marshmallow.schema.Schema
  • marshmallow.base.SchemaABC

Class variables

var Meta
var opts
var report_snapshot_date
class ReportSnapshot (report: type, flask_request: flask.wrappers.Request)
Expand source code
class ReportSnapshot:
    def __init__(self, report: type, flask_request: Request):
        self.report = report
        self.request = flask_request
        self.tenant_id = flask_request.headers.get("Tenantid")

    def add_report_components_data(self):

        report_metadata, status_code = self.report.return_json_payload()

        rcomponents = []
        for comp in self.report.components:
            comp_payload = comp.get_registration_payload()
            comp_payload["component_data"] = comp.get_data(self.request)
            rcomponents.append(comp_payload)

        report_metadata["report_components"] = rcomponents

        return report_metadata

    def generate_snapshot(self):

        report_data = self.add_report_components_data()

        report_data["tenant_id"] = self.tenant_id
        report_data["report_snapshot_date"] = datetime.utcnow().isoformat()

        mongodata.insert(
            schema=AllowAllSchema,
            collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME,
            data=report_data,
        )

        return report_data

    # def get_latest_snapshot(self):
    #
    #     start = time.time()
    #
    #     # Getting latest report snapshot
    #     snapshots_collection = mongodata.get_collection(envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME)
    #
    #     latest_snapshot = snapshots_collection.find(
    #         {
    #             'tenant_id': self.tenant_id,
    #             'report_id': self.report.report_id
    #         }
    #     ).sort("report_snapshot_date", ASCENDING).limit(1)
    #
    #     latest_snapshot = list(latest_snapshot)
    #
    #     snapshot_outdated = True
    #     if latest_snapshot:
    #         latest_snapshot = latest_snapshot[0]
    #         latest_snapshot.pop('_id')  # ObjectId is not serializable
    #
    #         counted = mongodata.document_count(
    #             match={
    #                 'tenant_id': self.tenant_id,
    #                 'updated_at': {'$gt': latest_snapshot['report_snapshot_date']}
    #             },
    #             collection=envs.MONGO_COLLECTION_ANALYSIS_NAME
    #         )
    #
    #         snapshot_outdated = True if counted > 0 else False
    #
    #     if snapshot_outdated:
    #         log.info("Returning generated report snapshot")
    #         report_data = self.generate_snapshot()
    #
    #         end = time.time()
    #         log.warning(f"Time elapsed: {end - start}")
    #         # 0.6979179382324219 ~ 0.70
    #         return report_data
    #
    #     end = time.time()
    #     log.warning(f"Time elapsed: {end - start}")
    #     # 0.02915167808532715 ~ 0.030
    #     log.info("Returning latest report snapshot")
    #     return latest_snapshot

    def get_report_data(self):

        report_data = self.generate_snapshot()

        return report_data

Methods

def add_report_components_data(self)
Expand source code
def add_report_components_data(self):

    report_metadata, status_code = self.report.return_json_payload()

    rcomponents = []
    for comp in self.report.components:
        comp_payload = comp.get_registration_payload()
        comp_payload["component_data"] = comp.get_data(self.request)
        rcomponents.append(comp_payload)

    report_metadata["report_components"] = rcomponents

    return report_metadata
def generate_snapshot(self)
Expand source code
def generate_snapshot(self):

    report_data = self.add_report_components_data()

    report_data["tenant_id"] = self.tenant_id
    report_data["report_snapshot_date"] = datetime.utcnow().isoformat()

    mongodata.insert(
        schema=AllowAllSchema,
        collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME,
        data=report_data,
    )

    return report_data
def get_report_data(self)
Expand source code
def get_report_data(self):

    report_data = self.generate_snapshot()

    return report_data