Module licenseware.app_builder.data_sync_namespace.data_sync_namespace
Expand source code
from marshmallow import Schema
from flask_restx import Namespace
from licenseware.mongodata import mongodata
from licenseware.common.constants import envs
from licenseware.decorators.auth_decorators import machine_check
from licenseware.schema_namespace import MongoCrud, SchemaNamespace
class MongoDataSync(MongoCrud):
def __init__(self, schema: Schema, collection: str):
self.schema = schema
self.collection = collection
def get_data(self, flask_request):
tenant = flask_request.headers.get("TenantId")
if not tenant:
raise Exception("TenantId header is required")
query = self.get_query(flask_request)
coll = mongodata.get_collection(self.collection)
return list(coll.find(query))
class DataSyncRoute(SchemaNamespace):
"""
Create a data sync endpoint with machine authorization.
Receives a schema and a collection name as optional (it will use mongo data collection by default)
Only implements the get_data functionality of MongoCrud (read-only)
Requires a TenantId header along with the machine authorization
"""
def __init__(self, ns: Namespace, schema: Schema, collection_name: str = None):
self.schema=schema
self.collection=collection_name or envs.MONGO_COLLECTION_DATA_NAME
self.methods=["GET"]
self.decorators=[machine_check]
self.mongo_crud_class=MongoDataSync
self.namespace = ns
super().__init__(**vars(self))
def get_data_sync_namespace(ns: Namespace, data_sync_schema: Schema):
class DataSyncSchema(data_sync_schema):
...
collection_name = None
if hasattr(data_sync_schema, 'Meta'):
if hasattr(data_sync_schema.Meta, 'mongo_collection_name'):
collection_name = data_sync_schema.Meta.mongo_collection_name
ns = DataSyncRoute(ns, DataSyncSchema, collection_name).initialize()
return ns
Functions
def get_data_sync_namespace(ns: flask_restx.namespace.Namespace, data_sync_schema: marshmallow.schema.Schema)
-
Expand source code
def get_data_sync_namespace(ns: Namespace, data_sync_schema: Schema): class DataSyncSchema(data_sync_schema): ... collection_name = None if hasattr(data_sync_schema, 'Meta'): if hasattr(data_sync_schema.Meta, 'mongo_collection_name'): collection_name = data_sync_schema.Meta.mongo_collection_name ns = DataSyncRoute(ns, DataSyncSchema, collection_name).initialize() return ns
Classes
class DataSyncRoute (ns: flask_restx.namespace.Namespace, schema: marshmallow.schema.Schema, collection_name: str = None)
-
Create a data sync endpoint with machine authorization. Receives a schema and a collection name as optional (it will use mongo data collection by default) Only implements the get_data functionality of MongoCrud (read-only) Requires a TenantId header along with the machine authorization
Expand source code
class DataSyncRoute(SchemaNamespace): """ Create a data sync endpoint with machine authorization. Receives a schema and a collection name as optional (it will use mongo data collection by default) Only implements the get_data functionality of MongoCrud (read-only) Requires a TenantId header along with the machine authorization """ def __init__(self, ns: Namespace, schema: Schema, collection_name: str = None): self.schema=schema self.collection=collection_name or envs.MONGO_COLLECTION_DATA_NAME self.methods=["GET"] self.decorators=[machine_check] self.mongo_crud_class=MongoDataSync self.namespace = ns super().__init__(**vars(self))
Ancestors
Inherited members
class MongoDataSync (schema: marshmallow.schema.Schema, collection: str)
-
In this class we are defining basic crud operations to mongodb
All methods must use
staticmethod
decoratorCreate indexes will assume simple_indexes are not unique and compound_indexes are unique. Indexes are provided on serializer metadata (simple_indexes, compound_indexes).
Expand source code
class MongoDataSync(MongoCrud): def __init__(self, schema: Schema, collection: str): self.schema = schema self.collection = collection def get_data(self, flask_request): tenant = flask_request.headers.get("TenantId") if not tenant: raise Exception("TenantId header is required") query = self.get_query(flask_request) coll = mongodata.get_collection(self.collection) return list(coll.find(query))
Ancestors
Methods
def get_data(self, flask_request)
-
Expand source code
def get_data(self, flask_request): tenant = flask_request.headers.get("TenantId") if not tenant: raise Exception("TenantId header is required") query = self.get_query(flask_request) coll = mongodata.get_collection(self.collection) return list(coll.find(query))