Module licenseware.schema_namespace.schema_namespace
Class SchemaNamespace
can be used to generate a restx namespace from a marshmallow schema.
A minimal example would be this:
from marshmallow import Schema, fields
from licenseware.schema_namespace import SchemaNamespace
class UserSchema(Schema):
''' Here is some namespace doc for user namespace '''
name = fields.Str(required=True)
occupation = fields.Str(required=True)
UserNs = SchemaNamespace(
schema=UserSchema,
collection="CustomCollection",
).initialize()
# Later, adding the namespace generated from schema to our App
from some_controller import UserNs
ifmp_app.add_namespace(UserNs)
The crud api along with docs will be generated from the provided schema.
Bellow we have the case where we need to update the default CRUD methods provided by MongoCrud
class.
from marshmallow import Schema, fields
from licenseware.mongodata import mongodata
from licenseware.common.constants import envs
from licenseware.utils.logger import log
from licenseware.schema_namespace import SchemaNamespace, MongoCrud
# Defining our schema
class UserSchema(Schema):
name = fields.Str(required=True)
occupation = fields.Str(required=True)
# Overwritting mongo crud methods
class UserOperations(MongoCrud):
def __init__(self, schema: Schema, collection: str):
self.schema = schema
self.collection = collection
super().__init__(schema, collection)
def get_data(self, flask_request):
query = self.get_query(flask_request)
results = mongodata.fetch(match=query, collection=self.collection)
return {"status": states.SUCCESS, "message": results}, 200
def post_data(self, flask_request):
query = UserOperations.get_query(flask_request)
data = dict(query, **{
"updated_at": datetime.datetime.utcnow().isoformat()}
)
inserted_docs = mongodata.insert(
schema=self.schema,
collection=self.collection,
data=data
)
return inserted_docs
def put_data(self, flask_request):
query = self.get_query(flask_request)
updated_docs = mongodata.update(
schema=self.schema,
match=query,
new_data=dict(query, **{"updated_at": datetime.datetime.utcnow().isoformat()}),
collection=self.collection,
append=False
)
if updated_docs == 0:
return {"status": states.SUCCESS, "message": "Query didn't matched any data"}, 400
return {"status": states.SUCCESS, "message": ""}, 200
def delete_data(self, flask_request):
query = self.get_query(flask_request)
deleted_docs = mongodata.delete(match=query, collection=self.collection)
return deleted_docs
# A restx namespace is generated on instantiation
UserNs = SchemaNamespace(
schema=UserSchema,
collection="CustomCollection",
mongo_crud_class=UserOperations,
decorators=[]
).initialize()
# Later, adding the namespace generated from schema to our App
from some_controller import UserNs
ifmp_app.add_namespace(UserNs)
Expand source code
"""
Class `SchemaNamespace` can be used to generate a restx namespace from a marshmallow schema.
A minimal example would be this:
```py
from marshmallow import Schema, fields
from licenseware.schema_namespace import SchemaNamespace
class UserSchema(Schema):
''' Here is some namespace doc for user namespace '''
name = fields.Str(required=True)
occupation = fields.Str(required=True)
UserNs = SchemaNamespace(
schema=UserSchema,
collection="CustomCollection",
).initialize()
# Later, adding the namespace generated from schema to our App
from some_controller import UserNs
ifmp_app.add_namespace(UserNs)
```
The crud api along with docs will be generated from the provided schema.
Bellow we have the case where we need to update the default CRUD methods provided by `MongoCrud` class.
```py
from marshmallow import Schema, fields
from licenseware.mongodata import mongodata
from licenseware.common.constants import envs
from licenseware.utils.logger import log
from licenseware.schema_namespace import SchemaNamespace, MongoCrud
# Defining our schema
class UserSchema(Schema):
name = fields.Str(required=True)
occupation = fields.Str(required=True)
# Overwritting mongo crud methods
class UserOperations(MongoCrud):
def __init__(self, schema: Schema, collection: str):
self.schema = schema
self.collection = collection
super().__init__(schema, collection)
def get_data(self, flask_request):
query = self.get_query(flask_request)
results = mongodata.fetch(match=query, collection=self.collection)
return {"status": states.SUCCESS, "message": results}, 200
def post_data(self, flask_request):
query = UserOperations.get_query(flask_request)
data = dict(query, **{
"updated_at": datetime.datetime.utcnow().isoformat()}
)
inserted_docs = mongodata.insert(
schema=self.schema,
collection=self.collection,
data=data
)
return inserted_docs
def put_data(self, flask_request):
query = self.get_query(flask_request)
updated_docs = mongodata.update(
schema=self.schema,
match=query,
new_data=dict(query, **{"updated_at": datetime.datetime.utcnow().isoformat()}),
collection=self.collection,
append=False
)
if updated_docs == 0:
return {"status": states.SUCCESS, "message": "Query didn't matched any data"}, 400
return {"status": states.SUCCESS, "message": ""}, 200
def delete_data(self, flask_request):
query = self.get_query(flask_request)
deleted_docs = mongodata.delete(match=query, collection=self.collection)
return deleted_docs
# A restx namespace is generated on instantiation
UserNs = SchemaNamespace(
schema=UserSchema,
collection="CustomCollection",
mongo_crud_class=UserOperations,
decorators=[]
).initialize()
# Later, adding the namespace generated from schema to our App
from some_controller import UserNs
ifmp_app.add_namespace(UserNs)
```
"""
from marshmallow import Schema, fields
# from marshmallow_jsonschema import JSONSchema
from flask_restx import Namespace, Resource
from licenseware.common.validators import validate_uuid4
from licenseware.common.constants import envs
from licenseware.decorators.auth_decorators import authorization_check
from licenseware.decorators.failsafe_decorator import failsafe
from licenseware.utils.miscellaneous import swagger_authorization_header, http_methods
from licenseware.utils.logger import log
from licenseware import mongodata
from licenseware.common.marshmallow_restx_converter import marshmallow_to_restx_model
from flask import request
from .mongo_crud import MongoCrud
# Every api namespace generated with SchemaNamespace must have `tenant_id` and `updated_at` fields
class BaseSchema(Schema):
tenant_id = fields.Str(required=True, validate=validate_uuid4)
updated_at = fields.Str(required=False)
class SchemaNamespace:
def __init__(self,
schema: type = None,
collection: str = None,
methods: list = http_methods,
decorators: list = [authorization_check],
disable_model: bool = False,
authorizations: dict = swagger_authorization_header,
mongo_crud_class: type = MongoCrud,
namespace: Namespace = None
):
self.schema = schema
self.doc = schema.__doc__
# Adding `tenant_id` and `updated_at` fields to received schema
self.schema = type(
self.schema.__name__,
(self.schema, BaseSchema,),
{}
)
try:
self.collection = self.schema.Meta.collection
except:
self.collection = collection or envs.MONGO_COLLECTION_DATA_NAME
try:
self.methods = self.schema.Meta.methods
except:
self.methods = methods
self.decorators = decorators
self.disable_model = disable_model
self.authorizations = authorizations
self.mongo_crud_class = mongo_crud_class
self.namespace = namespace
self.schema_name = self.schema.__name__
self.name = self.schema_name.replace("Schema", "")
self.path = "/" + self.name.lower()
def initialize(self) -> Namespace:
""" Create restx api namespace from schema """
self.create_indexes()
ns = self.create_resources()
return ns
def create_namespace(self) -> Namespace:
ns = Namespace(
name=self.name,
path=self.path,
description=self.doc or f"API is generated from {self.schema_name}",
decorators=self.decorators,
authorizations=self.authorizations,
security=list(self.authorizations.keys())
)
return ns
def create_resources(self) -> list:
ns = self.create_namespace()
if self.disable_model is True:
resource_fields = ns.model(self.name, {})
else:
resource_fields = marshmallow_to_restx_model(ns, self.schema)
allowed_methods = self.methods
data_service = self.mongo_crud_class(schema=self.schema, collection=self.collection)
class SchemaResource(Resource):
@failsafe(fail_code=500)
@ns.doc(id="Make a GET request to FETCH some data")
@ns.param('_id', 'get data by id')
@ns.response(code=200, description="A list of:", model=[resource_fields])
@ns.response(code=404, description="Requested data not found")
@ns.response(code=405, description="METHOD NOT ALLOWED")
def get(self):
if 'GET' in allowed_methods:
return data_service.get_data(request)
return "METHOD NOT ALLOWED", 405
@failsafe(fail_code=500)
@ns.doc(id="Make a POST request to INSERT some data")
@ns.expect(resource_fields)
@ns.response(code=400, description="Could not insert data")
@ns.response(code=405, description="METHOD NOT ALLOWED")
def post(self):
if 'POST' in allowed_methods:
return data_service.post_data(request)
return "METHOD NOT ALLOWED", 405
@failsafe(fail_code=500)
@ns.doc(id="Make a PUT request to UPDATE some data")
@ns.expect(resource_fields)
@ns.response(code=404, description="Query had no match")
@ns.response(code=405, description="METHOD NOT ALLOWED")
def put(self):
if 'PUT' in allowed_methods:
return data_service.put_data(request)
return "METHOD NOT ALLOWED", 405
@failsafe(fail_code=500)
@ns.doc(id="Make a DELETE request to DELETE some data")
@ns.expect(resource_fields)
@ns.response(code=404, description="Query had no match")
@ns.response(code=405, description="METHOD NOT ALLOWED")
def delete(self, _id=None):
if 'DELETE' in allowed_methods:
return data_service.delete_data(request)
return "METHOD NOT ALLOWED", 405
ns.add_resource(SchemaResource, "")
if "GET" not in allowed_methods:
ns.hide(SchemaResource.get)
if "PUT" not in allowed_methods:
ns.hide(SchemaResource.put)
if "POST" not in allowed_methods:
ns.hide(SchemaResource.post)
if "DELETE" not in allowed_methods:
ns.hide(SchemaResource.delete)
return ns
def create_indexes(self):
coll = mongodata.get_collection(self.collection)
try:
for i in self.schema.Meta.simple_indexes:
coll.create_index(i)
except AttributeError:
# log.info("No simple indexes declared")
pass
try:
for ci in self.schema.Meta.compound_indexes:
col_list = [(ci_m, 1) for ci_m in ci]
coll.create_index(col_list, unique=True)
except AttributeError:
# log.info("No compound indexes declared")
pass
Classes
class BaseSchema (*, only: Union[Sequence[str], Set[str], None] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict[~KT, ~VT]] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)
-
Base schema class with which to define custom schemas.
Example usage:
.. code-block:: python
import datetime as dt from dataclasses import dataclass from marshmallow import Schema, fields @dataclass class Album: title: str release_date: dt.date class AlbumSchema(Schema): title = fields.Str() release_date = fields.Date() album = Album("Beggars Banquet", dt.date(1968, 12, 6)) schema = AlbumSchema() data = schema.dump(album) data # {'release_date': '1968-12-06', 'title': 'Beggars Banquet'}
:param only: Whitelist of the declared fields to select when instantiating the Schema. If None, all fields are used. Nested fields can be represented with dot delimiters. :param exclude: Blacklist of the declared fields to exclude when instantiating the Schema. If a field appears in both
only
andexclude
, it is not used. Nested fields can be represented with dot delimiters. :param many: Should be set toTrue
ifobj
is a collection so that the object will be serialized to a list. :param context: Optional context passed to :class:fields.Method
and :class:fields.Function
fields. :param load_only: Fields to skip during serialization (write-only fields) :param dump_only: Fields to skip during deserialization (read-only fields) :param partial: Whether to ignore missing fields and not require any fields declared. Propagates down toNested
fields as well. If its value is an iterable, only missing fields listed in that iterable will be ignored. Use dot delimiters to specify nested fields. :param unknown: Whether to exclude, include, or raise an error for unknown fields in the data. UseEXCLUDE
,INCLUDE
orRAISE
.Changed in version: 3.0.0
prefix
parameter removed.Changed in version: 2.0.0
__validators__
,__preprocessors__
, and__data_handlers__
are removed in favor ofmarshmallow.decorators.validates_schema
,marshmallow.decorators.pre_load
andmarshmallow.decorators.post_dump
.__accessor__
and__error_handler__
are deprecated. Implement thehandle_error
andget_attribute
methods instead.Expand source code
class BaseSchema(Schema): tenant_id = fields.Str(required=True, validate=validate_uuid4) updated_at = fields.Str(required=False)
Ancestors
- marshmallow.schema.Schema
- marshmallow.base.SchemaABC
Class variables
var opts
var tenant_id
var updated_at
class SchemaNamespace (schema: type = None, collection: str = None, methods: list = ['GET', 'POST', 'PUT', 'DELETE'], decorators: list = [<function authorization_check>], disable_model: bool = False, authorizations: dict = {'Tenantid': {'type': 'apiKey', 'in': 'header', 'name': 'Tenantid'}, 'Authorization': {'type': 'apiKey', 'in': 'header', 'name': 'Authorization'}}, mongo_crud_class: type = licenseware.schema_namespace.mongo_crud.MongoCrud, namespace: flask_restx.namespace.Namespace = None)
-
Expand source code
class SchemaNamespace: def __init__(self, schema: type = None, collection: str = None, methods: list = http_methods, decorators: list = [authorization_check], disable_model: bool = False, authorizations: dict = swagger_authorization_header, mongo_crud_class: type = MongoCrud, namespace: Namespace = None ): self.schema = schema self.doc = schema.__doc__ # Adding `tenant_id` and `updated_at` fields to received schema self.schema = type( self.schema.__name__, (self.schema, BaseSchema,), {} ) try: self.collection = self.schema.Meta.collection except: self.collection = collection or envs.MONGO_COLLECTION_DATA_NAME try: self.methods = self.schema.Meta.methods except: self.methods = methods self.decorators = decorators self.disable_model = disable_model self.authorizations = authorizations self.mongo_crud_class = mongo_crud_class self.namespace = namespace self.schema_name = self.schema.__name__ self.name = self.schema_name.replace("Schema", "") self.path = "/" + self.name.lower() def initialize(self) -> Namespace: """ Create restx api namespace from schema """ self.create_indexes() ns = self.create_resources() return ns def create_namespace(self) -> Namespace: ns = Namespace( name=self.name, path=self.path, description=self.doc or f"API is generated from {self.schema_name}", decorators=self.decorators, authorizations=self.authorizations, security=list(self.authorizations.keys()) ) return ns def create_resources(self) -> list: ns = self.create_namespace() if self.disable_model is True: resource_fields = ns.model(self.name, {}) else: resource_fields = marshmallow_to_restx_model(ns, self.schema) allowed_methods = self.methods data_service = self.mongo_crud_class(schema=self.schema, collection=self.collection) class SchemaResource(Resource): @failsafe(fail_code=500) @ns.doc(id="Make a GET request to FETCH some data") @ns.param('_id', 'get data by id') @ns.response(code=200, description="A list of:", model=[resource_fields]) @ns.response(code=404, description="Requested data not found") @ns.response(code=405, description="METHOD NOT ALLOWED") def get(self): if 'GET' in allowed_methods: return data_service.get_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a POST request to INSERT some data") @ns.expect(resource_fields) @ns.response(code=400, description="Could not insert data") @ns.response(code=405, description="METHOD NOT ALLOWED") def post(self): if 'POST' in allowed_methods: return data_service.post_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a PUT request to UPDATE some data") @ns.expect(resource_fields) @ns.response(code=404, description="Query had no match") @ns.response(code=405, description="METHOD NOT ALLOWED") def put(self): if 'PUT' in allowed_methods: return data_service.put_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a DELETE request to DELETE some data") @ns.expect(resource_fields) @ns.response(code=404, description="Query had no match") @ns.response(code=405, description="METHOD NOT ALLOWED") def delete(self, _id=None): if 'DELETE' in allowed_methods: return data_service.delete_data(request) return "METHOD NOT ALLOWED", 405 ns.add_resource(SchemaResource, "") if "GET" not in allowed_methods: ns.hide(SchemaResource.get) if "PUT" not in allowed_methods: ns.hide(SchemaResource.put) if "POST" not in allowed_methods: ns.hide(SchemaResource.post) if "DELETE" not in allowed_methods: ns.hide(SchemaResource.delete) return ns def create_indexes(self): coll = mongodata.get_collection(self.collection) try: for i in self.schema.Meta.simple_indexes: coll.create_index(i) except AttributeError: # log.info("No simple indexes declared") pass try: for ci in self.schema.Meta.compound_indexes: col_list = [(ci_m, 1) for ci_m in ci] coll.create_index(col_list, unique=True) except AttributeError: # log.info("No compound indexes declared") pass
Subclasses
Methods
def create_indexes(self)
-
Expand source code
def create_indexes(self): coll = mongodata.get_collection(self.collection) try: for i in self.schema.Meta.simple_indexes: coll.create_index(i) except AttributeError: # log.info("No simple indexes declared") pass try: for ci in self.schema.Meta.compound_indexes: col_list = [(ci_m, 1) for ci_m in ci] coll.create_index(col_list, unique=True) except AttributeError: # log.info("No compound indexes declared") pass
def create_namespace(self) ‑> flask_restx.namespace.Namespace
-
Expand source code
def create_namespace(self) -> Namespace: ns = Namespace( name=self.name, path=self.path, description=self.doc or f"API is generated from {self.schema_name}", decorators=self.decorators, authorizations=self.authorizations, security=list(self.authorizations.keys()) ) return ns
def create_resources(self) ‑> list
-
Expand source code
def create_resources(self) -> list: ns = self.create_namespace() if self.disable_model is True: resource_fields = ns.model(self.name, {}) else: resource_fields = marshmallow_to_restx_model(ns, self.schema) allowed_methods = self.methods data_service = self.mongo_crud_class(schema=self.schema, collection=self.collection) class SchemaResource(Resource): @failsafe(fail_code=500) @ns.doc(id="Make a GET request to FETCH some data") @ns.param('_id', 'get data by id') @ns.response(code=200, description="A list of:", model=[resource_fields]) @ns.response(code=404, description="Requested data not found") @ns.response(code=405, description="METHOD NOT ALLOWED") def get(self): if 'GET' in allowed_methods: return data_service.get_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a POST request to INSERT some data") @ns.expect(resource_fields) @ns.response(code=400, description="Could not insert data") @ns.response(code=405, description="METHOD NOT ALLOWED") def post(self): if 'POST' in allowed_methods: return data_service.post_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a PUT request to UPDATE some data") @ns.expect(resource_fields) @ns.response(code=404, description="Query had no match") @ns.response(code=405, description="METHOD NOT ALLOWED") def put(self): if 'PUT' in allowed_methods: return data_service.put_data(request) return "METHOD NOT ALLOWED", 405 @failsafe(fail_code=500) @ns.doc(id="Make a DELETE request to DELETE some data") @ns.expect(resource_fields) @ns.response(code=404, description="Query had no match") @ns.response(code=405, description="METHOD NOT ALLOWED") def delete(self, _id=None): if 'DELETE' in allowed_methods: return data_service.delete_data(request) return "METHOD NOT ALLOWED", 405 ns.add_resource(SchemaResource, "") if "GET" not in allowed_methods: ns.hide(SchemaResource.get) if "PUT" not in allowed_methods: ns.hide(SchemaResource.put) if "POST" not in allowed_methods: ns.hide(SchemaResource.post) if "DELETE" not in allowed_methods: ns.hide(SchemaResource.delete) return ns
def initialize(self) ‑> flask_restx.namespace.Namespace
-
Create restx api namespace from schema
Expand source code
def initialize(self) -> Namespace: """ Create restx api namespace from schema """ self.create_indexes() ns = self.create_resources() return ns