Module licenseware.report_snapshot.report_snapshot
Expand source code
from flask import Request
from datetime import datetime
from licenseware import mongodata
from licenseware.common.constants import envs
from licenseware.utils.logger import log
from marshmallow import Schema, fields, INCLUDE
from pymongo import ASCENDING
import time
#! OUTDATED - pagination needs to be implemented
class AllowAllSchema(Schema):
class Meta:
unknown = INCLUDE
report_snapshot_date = fields.Str(required=True)
class ReportSnapshot:
def __init__(self, report: type, flask_request: Request):
self.report = report
self.request = flask_request
self.tenant_id = flask_request.headers.get("Tenantid")
def add_report_components_data(self):
report_metadata, status_code = self.report.return_json_payload()
rcomponents = []
for comp in self.report.components:
comp_payload = comp.get_registration_payload()
comp_payload["component_data"] = comp.get_data(self.request)
rcomponents.append(comp_payload)
report_metadata["report_components"] = rcomponents
return report_metadata
def generate_snapshot(self):
report_data = self.add_report_components_data()
report_data["tenant_id"] = self.tenant_id
report_data["report_snapshot_date"] = datetime.utcnow().isoformat()
mongodata.insert(
schema=AllowAllSchema,
collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME,
data=report_data,
)
return report_data
# def get_latest_snapshot(self):
#
# start = time.time()
#
# # Getting latest report snapshot
# snapshots_collection = mongodata.get_collection(envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME)
#
# latest_snapshot = snapshots_collection.find(
# {
# 'tenant_id': self.tenant_id,
# 'report_id': self.report.report_id
# }
# ).sort("report_snapshot_date", ASCENDING).limit(1)
#
# latest_snapshot = list(latest_snapshot)
#
# snapshot_outdated = True
# if latest_snapshot:
# latest_snapshot = latest_snapshot[0]
# latest_snapshot.pop('_id') # ObjectId is not serializable
#
# counted = mongodata.document_count(
# match={
# 'tenant_id': self.tenant_id,
# 'updated_at': {'$gt': latest_snapshot['report_snapshot_date']}
# },
# collection=envs.MONGO_COLLECTION_ANALYSIS_NAME
# )
#
# snapshot_outdated = True if counted > 0 else False
#
# if snapshot_outdated:
# log.info("Returning generated report snapshot")
# report_data = self.generate_snapshot()
#
# end = time.time()
# log.warning(f"Time elapsed: {end - start}")
# # 0.6979179382324219 ~ 0.70
# return report_data
#
# end = time.time()
# log.warning(f"Time elapsed: {end - start}")
# # 0.02915167808532715 ~ 0.030
# log.info("Returning latest report snapshot")
# return latest_snapshot
def get_report_data(self):
report_data = self.generate_snapshot()
return report_data
Classes
class AllowAllSchema (*, only: Union[Sequence[str], Set[str], None] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict[~KT, ~VT]] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
-
Base schema class with which to define custom schemas.
Example usage:
.. code-block:: python
import datetime as dt from dataclasses import dataclass from marshmallow import Schema, fields @dataclass class Album: title: str release_date: dt.date class AlbumSchema(Schema): title = fields.Str() release_date = fields.Date() album = Album("Beggars Banquet", dt.date(1968, 12, 6)) schema = AlbumSchema() data = schema.dump(album) data # {'release_date': '1968-12-06', 'title': 'Beggars Banquet'}
:param only: Whitelist of the declared fields to select when instantiating the Schema. If None, all fields are used. Nested fields can be represented with dot delimiters. :param exclude: Blacklist of the declared fields to exclude when instantiating the Schema. If a field appears in both
only
andexclude
, it is not used. Nested fields can be represented with dot delimiters. :param many: Should be set toTrue
ifobj
is a collection so that the object will be serialized to a list. :param context: Optional context passed to :class:fields.Method
and :class:fields.Function
fields. :param load_only: Fields to skip during serialization (write-only fields) :param dump_only: Fields to skip during deserialization (read-only fields) :param partial: Whether to ignore missing fields and not require any fields declared. Propagates down toNested
fields as well. If its value is an iterable, only missing fields listed in that iterable will be ignored. Use dot delimiters to specify nested fields. :param unknown: Whether to exclude, include, or raise an error for unknown fields in the data. UseEXCLUDE
,INCLUDE
orRAISE
.Changed in version: 3.0.0
prefix
parameter removed.Changed in version: 2.0.0
__validators__
,__preprocessors__
, and__data_handlers__
are removed in favor ofmarshmallow.decorators.validates_schema
,marshmallow.decorators.pre_load
andmarshmallow.decorators.post_dump
.__accessor__
and__error_handler__
are deprecated. Implement thehandle_error
andget_attribute
methods instead.Expand source code
class AllowAllSchema(Schema): class Meta: unknown = INCLUDE report_snapshot_date = fields.Str(required=True)
Ancestors
- marshmallow.schema.Schema
- marshmallow.base.SchemaABC
Class variables
var Meta
var opts
var report_snapshot_date
class ReportSnapshot (report: type, flask_request: flask.wrappers.Request)
-
Expand source code
class ReportSnapshot: def __init__(self, report: type, flask_request: Request): self.report = report self.request = flask_request self.tenant_id = flask_request.headers.get("Tenantid") def add_report_components_data(self): report_metadata, status_code = self.report.return_json_payload() rcomponents = [] for comp in self.report.components: comp_payload = comp.get_registration_payload() comp_payload["component_data"] = comp.get_data(self.request) rcomponents.append(comp_payload) report_metadata["report_components"] = rcomponents return report_metadata def generate_snapshot(self): report_data = self.add_report_components_data() report_data["tenant_id"] = self.tenant_id report_data["report_snapshot_date"] = datetime.utcnow().isoformat() mongodata.insert( schema=AllowAllSchema, collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME, data=report_data, ) return report_data # def get_latest_snapshot(self): # # start = time.time() # # # Getting latest report snapshot # snapshots_collection = mongodata.get_collection(envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME) # # latest_snapshot = snapshots_collection.find( # { # 'tenant_id': self.tenant_id, # 'report_id': self.report.report_id # } # ).sort("report_snapshot_date", ASCENDING).limit(1) # # latest_snapshot = list(latest_snapshot) # # snapshot_outdated = True # if latest_snapshot: # latest_snapshot = latest_snapshot[0] # latest_snapshot.pop('_id') # ObjectId is not serializable # # counted = mongodata.document_count( # match={ # 'tenant_id': self.tenant_id, # 'updated_at': {'$gt': latest_snapshot['report_snapshot_date']} # }, # collection=envs.MONGO_COLLECTION_ANALYSIS_NAME # ) # # snapshot_outdated = True if counted > 0 else False # # if snapshot_outdated: # log.info("Returning generated report snapshot") # report_data = self.generate_snapshot() # # end = time.time() # log.warning(f"Time elapsed: {end - start}") # # 0.6979179382324219 ~ 0.70 # return report_data # # end = time.time() # log.warning(f"Time elapsed: {end - start}") # # 0.02915167808532715 ~ 0.030 # log.info("Returning latest report snapshot") # return latest_snapshot def get_report_data(self): report_data = self.generate_snapshot() return report_data
Methods
def add_report_components_data(self)
-
Expand source code
def add_report_components_data(self): report_metadata, status_code = self.report.return_json_payload() rcomponents = [] for comp in self.report.components: comp_payload = comp.get_registration_payload() comp_payload["component_data"] = comp.get_data(self.request) rcomponents.append(comp_payload) report_metadata["report_components"] = rcomponents return report_metadata
def generate_snapshot(self)
-
Expand source code
def generate_snapshot(self): report_data = self.add_report_components_data() report_data["tenant_id"] = self.tenant_id report_data["report_snapshot_date"] = datetime.utcnow().isoformat() mongodata.insert( schema=AllowAllSchema, collection=envs.MONGO_COLLECTION_REPORT_SNAPSHOTS_NAME, data=report_data, ) return report_data
def get_report_data(self)
-
Expand source code
def get_report_data(self): report_data = self.generate_snapshot() return report_data