Module licenseware.repository.postgres.postgres_repository

Usage:


from marshmallow import Schema, fields
from licenseware.repository.postgres import PostgresRepository

db_url = f"postgresql://{ os.environ['POSTGRES_USERNAME'] }:{ os.environ['POSTGRES_PASSWORD'] }@{ os.environ['POSTGRES_HOSTNAME'] }/{ os.environ['POSTGRES_DB'] }"

db = PostgresRepository(db_url)


# Define a marshmallow schema

class UsersSchema(Schema):

    class Meta:
        __tablename__ = 'users'

    name = fields.String(required=True)
    email = fields.String(required=True)
    address = fields.String(required=True)


# __tablename__ is optional, if not completed `users` will be the table name from schema.__name__

db.register_schema(UsersSchema)


# Insert some data

db.insert_one(
    schema = UsersSchema,
    name = 'Travis',
    email = 'travis@gmail.com',
    address = 'Oradea, Centru'
)


# Insert some data in bulk

db.insert_many(
    schema = UsersSchema,
    data=[
        {
            'name': 'Anakin',
            'email': 'anakin@gmail.com',
            'address': 'Calea Lactee'
        },
        {
            'name': 'Bum',
            'email': 'bum@gmail.com',
            'address': 'Calea Lactee'
        },
        {
            'name': 'Doka',
            'email': 'doka@gmail.com',
            'address': 'Calea Lactee'
        }
    ]
)


# Fetching some data 

# First item that matched
db.fetch_one(schema_name="UsersSchema", email='travis@gmail.com')
db.fetch_by_id(schema_name="UsersSchema", id=13)

# All items that matched query
db.fetch_many(schema_name="UsersSchema", address='Oradea, Centru')

# All table
db.fetch_many(schema_name="UsersSchema")



# SQLAlchemy models can be created by inheriting from declarative base (db.Base)

class UsersTable(db.Base):

    __tablename__ = "users"
    __table_args__ = {'extend_existing': True} 

    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
    name = sa.Column(sa.String)
    email = sa.Column(sa.String, unique=True)
    address = sa.Column(sa.String)


# Insert with tuple of sqlalchemy model and marshmallow schema for validation

db.insert_one(
    schema = (UsersTable, UsersSchema),
    name = 'John',
    email = 'john@gmail.com',
    address = 'Iasi, Centru'
)

# Count items matched by query

count_nbr = db.count(schema_name='UsersSchema', address='Calea Lactee')


# Update some data

db.update_one(
    schema_name='UsersSchema',
    filters={'name':'Birt'},
    data={
        'email': 'birt_updated@gmail.com'
    }
)


db.update_on_id(
    schema_name='UsersSchema', 
    id=user['id'], 
    data={'email': 'birt@gmail.com'}
)



db.update_many(
    schema_name='UsersSchema',
    filters={'address': 'Calea Lactee'},
    data={
        'address': 'Mars, Calea Lactee'
    }
)


# Delete some data


db.delete_one(
    schema_name='UsersSchema',
    email='birt@gmail.com'
)


db.delete_by_id(
    schema_name='UsersSchema',
    id=user['id']
)

db.delete_many(schema_name='UsersSchema')



# Connecting to bare bones sqlalchemy 


with db.engine.connect() as conn:

    result = conn.execute(sa.insert(UsersTable),
            [
                {
                    'name': 'Etra',
                    'email': 'etra@gmail.com',
                    'address': 'Budabesta'
                },
                {
                    'name': 'Tyron',
                    'email': 'tyron@gmail.com',
                    'address': 'Budabesta'
                },
                {
                    'name': 'Pesta',
                    'email': 'pesta@gmail.com',
                    'address': 'Budabesta'
                }
            ]
        )


# or using the session object with raw sql or sqlalchemy's sql builder


sql_query = "SELECT * FROM users"

cursor = db.execute(sql_query)

# print(cursor.all())

users = db.deserialize(schema_name='UsersSchema', cursor_data=cursor.all())

# print(users)


# the sql query can be just a string

sql_query = "DELETE FROM users *"

cursor = db.execute(sql_query)


# You set commit to true on execute or you can commit later

db.session.commit()




All migrations will be handled automatically

TODO integrate column paramters in marshmallow schema

TODO include Base.metadata in PostgresMigrations

Expand source code
"""

Usage:

```py

from marshmallow import Schema, fields
from licenseware.repository.postgres import PostgresRepository

db_url = f"postgresql://{ os.environ['POSTGRES_USERNAME'] }:{ os.environ['POSTGRES_PASSWORD'] }@{ os.environ['POSTGRES_HOSTNAME'] }/{ os.environ['POSTGRES_DB'] }"

db = PostgresRepository(db_url)


# Define a marshmallow schema

class UsersSchema(Schema):
    
    class Meta:
        __tablename__ = 'users'
    
    name = fields.String(required=True)
    email = fields.String(required=True)
    address = fields.String(required=True)


# __tablename__ is optional, if not completed `users` will be the table name from schema.__name__

db.register_schema(UsersSchema)


# Insert some data

db.insert_one(
    schema = UsersSchema,
    name = 'Travis',
    email = 'travis@gmail.com',
    address = 'Oradea, Centru'
)


# Insert some data in bulk

db.insert_many(
    schema = UsersSchema,
    data=[
        {
            'name': 'Anakin',
            'email': 'anakin@gmail.com',
            'address': 'Calea Lactee'
        },
        {
            'name': 'Bum',
            'email': 'bum@gmail.com',
            'address': 'Calea Lactee'
        },
        {
            'name': 'Doka',
            'email': 'doka@gmail.com',
            'address': 'Calea Lactee'
        }
    ]
)


# Fetching some data 

# First item that matched
db.fetch_one(schema_name="UsersSchema", email='travis@gmail.com')
db.fetch_by_id(schema_name="UsersSchema", id=13)

# All items that matched query
db.fetch_many(schema_name="UsersSchema", address='Oradea, Centru')

# All table
db.fetch_many(schema_name="UsersSchema")



# SQLAlchemy models can be created by inheriting from declarative base (db.Base)

class UsersTable(db.Base):
    
    __tablename__ = "users"
    __table_args__ = {'extend_existing': True} 
    
    id = sa.Column(sa.Integer, primary_key=True, autoincrement=True)
    name = sa.Column(sa.String)
    email = sa.Column(sa.String, unique=True)
    address = sa.Column(sa.String)


# Insert with tuple of sqlalchemy model and marshmallow schema for validation

db.insert_one(
    schema = (UsersTable, UsersSchema),
    name = 'John',
    email = 'john@gmail.com',
    address = 'Iasi, Centru'
)

# Count items matched by query

count_nbr = db.count(schema_name='UsersSchema', address='Calea Lactee')


# Update some data

db.update_one(
    schema_name='UsersSchema',
    filters={'name':'Birt'},
    data={
        'email': 'birt_updated@gmail.com'
    }
)


db.update_on_id(
    schema_name='UsersSchema', 
    id=user['id'], 
    data={'email': 'birt@gmail.com'}
)



db.update_many(
    schema_name='UsersSchema',
    filters={'address': 'Calea Lactee'},
    data={
        'address': 'Mars, Calea Lactee'
    }
)


# Delete some data


db.delete_one(
    schema_name='UsersSchema',
    email='birt@gmail.com'
)


db.delete_by_id(
    schema_name='UsersSchema',
    id=user['id']
)

db.delete_many(schema_name='UsersSchema')



# Connecting to bare bones sqlalchemy 


with db.engine.connect() as conn:
    
    result = conn.execute(sa.insert(UsersTable),
            [
                {
                    'name': 'Etra',
                    'email': 'etra@gmail.com',
                    'address': 'Budabesta'
                },
                {
                    'name': 'Tyron',
                    'email': 'tyron@gmail.com',
                    'address': 'Budabesta'
                },
                {
                    'name': 'Pesta',
                    'email': 'pesta@gmail.com',
                    'address': 'Budabesta'
                }
            ]
        )
    

# or using the session object with raw sql or sqlalchemy's sql builder


sql_query = "SELECT * FROM users"

cursor = db.execute(sql_query)

# print(cursor.all())

users = db.deserialize(schema_name='UsersSchema', cursor_data=cursor.all())

# print(users)


# the sql query can be just a string

sql_query = "DELETE FROM users *"

cursor = db.execute(sql_query)


# You set commit to true on execute or you can commit later

db.session.commit()




```

All migrations will be handled automatically


# TODO integrate column paramters in marshmallow schema
# TODO include Base.metadata in PostgresMigrations



"""

from typing import List, Union
from collections.abc import Iterable

import sqlalchemy as sa
from sqlalchemy.orm import mapper
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.engine.cursor import CursorResult

from marshmallow import Schema, fields
from marshmallow.exceptions import ValidationError
from licenseware.repository.interface import RepositoryInterface

from .postgres_migrations import PostgresMigrations

# Marshmallow to Sqlalchemy fields mapper
# Only strings, integers, floats and booleans are suported for now
MATOSA_MAPPER = {
    fields.String: sa.String,
    fields.Str: sa.String,
    fields.Integer: sa.Integer,
    fields.Int: sa.Integer,
    fields.Float: sa.Float,
    fields.Boolean: sa.Boolean,
    fields.Bool: sa.Boolean
}


class PostgresRepository(RepositoryInterface):

    def __init__(self, db_url: str):
        self.db_url = db_url
        self.engine = sa.create_engine(db_url)
        self.session = scoped_session(sessionmaker(bind=self.engine))
        self.Base = declarative_base(bind=self.engine)
        self.registered_tables = {}
        self.registered_schemas = {}

    def create_all(self):
        """ Create connection to db, create all tables and generate migration files """

        self.Base.metadata.create_all()
        migrations = PostgresMigrations(self.db_url, target_metadata=self.Base.metadata)
        migrations.make_migrations()

    def register_schema(self, schema: Schema) -> str:
        """ Register schema """

        table = self._get_table_from_schema(schema)
        self.registered_tables[schema.__name__] = table
        self.registered_schemas[schema.__name__] = schema

        if not self.has_table(table.__tablename__):
            self.create_all()  # create table if doesn't exist

        return table.__tablename__

    # Fetching data

    def _deserialize_value(self, d: any, field_name: str, idx: int):

        if isinstance(d, list):
            for item in d:
                if isinstance(item, tuple):
                    return item[idx]

        if isinstance(d, Iterable):
            if len(d) == 1:
                return getattr(d[0], field_name)

        return getattr(d, field_name)

    def deserialize(self, schema_name: str, cursor_data: any):

        if cursor_data is None: return [{}]

        schema = self.registered_schemas[schema_name]

        datali = []
        for d in cursor_data:

            data_dict = {}

            # Getting the id
            data_dict['id'] = self._deserialize_value(d, field_name='id', idx=0)

            for idx, field_name in enumerate(schema().declared_fields.keys()):
                data_dict[field_name] = self._deserialize_value(d, field_name, idx + 1)

            datali.append(data_dict)

        return datali

    def fetch(self, schema_name: str, id: str = None, first: bool = False, limit: int = None, skip: int = None,
              **filters) -> List[dict]:
        """ 
        TODO limit and skip
        """

        first = first or ('id' in filters and len(filters) == 1)

        table = self.registered_tables[schema_name]
        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

        cursor = self.execute(sql_query)

        cursor_data = cursor.first() if first else cursor.all()
        deserialized_data = self.deserialize(schema_name, cursor_data)

        return deserialized_data[0] if first else deserialized_data

    def fetch_one(self, schema_name: str, **filters) -> dict:
        """ Get first item match """
        return self.fetch(schema_name, first=True, **filters)

    def fetch_by_id(self, schema_name: str, id: str) -> dict:
        """ Get first item match by id """
        return self.fetch(schema_name, first=True, id=id)

    def fetch_many(self, schema_name: str, **filters) -> List[dict]:
        """ Get all items that matched  """
        return self.fetch(schema_name, **filters)

        # Inserting new data

    def _sa_field(self, ma_field):
        try:
            return MATOSA_MAPPER[type(ma_field)]
        except:
            raise ValueError("Only strings, integers, floats and booleans are suported for the moment")

    def _get_table_name_from_schema(self, schema: Schema):

        if self.registered_tables.get(schema.__name__):
            raise Exception(f"Schema '{schema.__name__}' already registered")

        table_name = None
        if hasattr(schema, 'Meta'):
            if hasattr(schema.Meta, '__tablename__'):
                table_name = schema.Meta.__tablename__

        if not table_name:
            table_name = schema.__name__.replace("Schema", "").lower()

        return table_name

    def _get_columns(self, schema: Schema):

        sa_cols = []
        for field_name, temp_field_type in schema().declared_fields.items():
            field_type = self._sa_field(temp_field_type)
            sa_cols.append(sa.Column(field_name, field_type))
        return sa_cols

    def _get_table_from_schema(self, schema: Schema):
        """ Convert a marshmallow schema to a sqlalchemy model """

        table_name = self._get_table_name_from_schema(schema)
        columns = self._get_columns(schema)

        Table = sa.Table(
            table_name,
            self.Base.metadata,
            sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
            *columns,
            extend_existing=True,
        )

        Model = type(table_name, (object,), {'__tablename__': table_name})

        mapper(Model, Table)

        return Model

    def has_table(self, table_name: str):

        has_table = (
            sa.inspect(self.engine).dialect
                .has_table(self.engine.connect(), table_name)
        )

        return has_table

    def execute(self, sql_query: str, commit: bool = False) -> CursorResult:
        """
            Execute SQLAlchemy on db queries like:
            sql_query = select(UsersTable).where(UsersTable.name == 'John')
            
            Returns a cursor from which you can get results.
            
        """

        cursor: CursorResult = self.session \
            .execute(sa.text(sql_query) if isinstance(sql_query, str) else sql_query)

        if commit: self.session.commit()

        return cursor

    def insert(self, schema: Schema, data: Union[dict, List[dict]]) -> CursorResult:
        """ Insert dict or list of dict to db """

        assert isinstance(data, dict) or isinstance(data, list)
        if isinstance(data, dict): data = [data]

        if isinstance(schema, tuple):
            table = schema[0]
            schema[1](many=True).load(data)  # marshmallow validate
        elif hasattr(schema, '__tablename__'):
            table = schema  # it's an SQLAlchemy Model
        else:
            schema(many=True).load(data)  # marshmallow validate
            table = self.registered_tables[schema.__name__]

        sql_query = sa.insert(table).values(data).returning(sa.text('id'))
        cursor = self.execute(sql_query, commit=True)

        return cursor.all()

    def insert_one(self, schema: Schema, data: dict = None, **data_kwargs) -> CursorResult:
        """ Insert dict to db """
        return self.insert(schema, data or data_kwargs)

    def insert_with_id(self, schema: Schema, id: Union[str, int], data: dict = None, **data_kwargs) -> CursorResult:
        """ Insert dict with a specific 'id' """
        raise NotImplementedError

    def insert_many(self, schema: Schema, data: List[dict], validate_percentage: float = 1.0) -> CursorResult:
        """
            TODO `validate_percentage`: what percentage of the list of data provided should be validated (0.5 is 50%)
        """
        return self.insert(schema, data)

    # Updating existing data

    def count(self, schema_name: str, sql_query: any = None, **filters) -> int:

        table = self.registered_tables[schema_name]

        if not sql_query:
            filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
            sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

        cursor = self.execute(sql_query)
        counted_items = len(cursor.all())

        # TODO not working
        # counted_items = self.session.query(sa.select([sa.func.count()]).select_from(sql_query)).count()

        return counted_items

    def _validate_on_update(self, schema_name: str, data: Union[dict, List[dict]]):

        try:
            # marshmallow validate
            schema = self.registered_schemas[schema_name]

            schema(many=True).load(data) if isinstance(data, list) else schema().load(data)

        except ValidationError as Errors:

            # When updating we are only interested if the field to be updated is valid

            errors_dict = Errors.normalized_messages()

            if isinstance(data, list):
                for d in data:
                    field_found = set.intersection(set(errors_dict.keys()), set(d.keys()))
                    if field_found: raise Errors
            else:
                field_found = set.intersection(set(errors_dict.keys()), set(data.keys()))
                if field_found: raise Errors

    def update(self, schema_name: str, filters: dict, data: Union[dict, List[dict]],
               first: bool = False) -> CursorResult:
        """ """

        self._validate_on_update(schema_name, data)

        table = self.registered_tables[schema_name]

        counted_items = self.count(schema_name, **filters)
        if first is True:
            if counted_items != 1:
                raise ValueError("Can't update one item with given filters")

        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.update(table).where(eval(filters_string)).values(**data)

        cursor = self.execute(sql_query, commit=True)

        return cursor

    def update_one(self, schema_name: str, filters: dict, data: dict) -> CursorResult:
        """ Update one item """
        return self.update(schema_name, filters=filters, data=data, first=True)

    def update_on_id(self, schema_name: str, id: Union[str, int], data: dict) -> CursorResult:
        """ Update one item using the id """
        return self.update(schema_name, filters={'id': id}, data=data, first=True)

    def update_many(self, schema_name: str, filters: dict, data: List[dict],
                    validate_percentage: float = 1.0) -> CursorResult:
        """
            Update many
            TODO `validate_percentage`: what percentage of the list of data provided should be validated (0.5 is 50% of items in list)
        """
        return self.update(schema_name, filters=filters, data=data)

    # Deleting existing data

    def delete(self, schema_name: str, filters: dict, first: bool = False) -> CursorResult:
        """ Delete items """

        table = self.registered_tables[schema_name]

        counted_items = self.count(schema_name, **filters)
        if first is True:
            if counted_items != 1:
                raise ValueError("Can't delete one item with given filters")

        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.delete(table).where(eval(filters_string)) if filters_string else sa.delete(table)

        cursor = self.execute(sql_query, commit=True)

        return cursor

    def delete_one(self, schema_name: str, **filters) -> CursorResult:
        """ Delete one item """
        return self.delete(schema_name, filters, first=True)

    def delete_by_id(self, schema_name: str, id: str) -> CursorResult:
        """ Delete one item using the id provided """
        return self.delete(schema_name, filters={'id': id}, first=True)

    def delete_many(self, schema_name: str, **filters) -> CursorResult:
        """ Delete items that match filters """
        return self.delete(schema_name, filters)

Classes

class PostgresRepository (db_url: str)
Expand source code
class PostgresRepository(RepositoryInterface):

    def __init__(self, db_url: str):
        self.db_url = db_url
        self.engine = sa.create_engine(db_url)
        self.session = scoped_session(sessionmaker(bind=self.engine))
        self.Base = declarative_base(bind=self.engine)
        self.registered_tables = {}
        self.registered_schemas = {}

    def create_all(self):
        """ Create connection to db, create all tables and generate migration files """

        self.Base.metadata.create_all()
        migrations = PostgresMigrations(self.db_url, target_metadata=self.Base.metadata)
        migrations.make_migrations()

    def register_schema(self, schema: Schema) -> str:
        """ Register schema """

        table = self._get_table_from_schema(schema)
        self.registered_tables[schema.__name__] = table
        self.registered_schemas[schema.__name__] = schema

        if not self.has_table(table.__tablename__):
            self.create_all()  # create table if doesn't exist

        return table.__tablename__

    # Fetching data

    def _deserialize_value(self, d: any, field_name: str, idx: int):

        if isinstance(d, list):
            for item in d:
                if isinstance(item, tuple):
                    return item[idx]

        if isinstance(d, Iterable):
            if len(d) == 1:
                return getattr(d[0], field_name)

        return getattr(d, field_name)

    def deserialize(self, schema_name: str, cursor_data: any):

        if cursor_data is None: return [{}]

        schema = self.registered_schemas[schema_name]

        datali = []
        for d in cursor_data:

            data_dict = {}

            # Getting the id
            data_dict['id'] = self._deserialize_value(d, field_name='id', idx=0)

            for idx, field_name in enumerate(schema().declared_fields.keys()):
                data_dict[field_name] = self._deserialize_value(d, field_name, idx + 1)

            datali.append(data_dict)

        return datali

    def fetch(self, schema_name: str, id: str = None, first: bool = False, limit: int = None, skip: int = None,
              **filters) -> List[dict]:
        """ 
        TODO limit and skip
        """

        first = first or ('id' in filters and len(filters) == 1)

        table = self.registered_tables[schema_name]
        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

        cursor = self.execute(sql_query)

        cursor_data = cursor.first() if first else cursor.all()
        deserialized_data = self.deserialize(schema_name, cursor_data)

        return deserialized_data[0] if first else deserialized_data

    def fetch_one(self, schema_name: str, **filters) -> dict:
        """ Get first item match """
        return self.fetch(schema_name, first=True, **filters)

    def fetch_by_id(self, schema_name: str, id: str) -> dict:
        """ Get first item match by id """
        return self.fetch(schema_name, first=True, id=id)

    def fetch_many(self, schema_name: str, **filters) -> List[dict]:
        """ Get all items that matched  """
        return self.fetch(schema_name, **filters)

        # Inserting new data

    def _sa_field(self, ma_field):
        try:
            return MATOSA_MAPPER[type(ma_field)]
        except:
            raise ValueError("Only strings, integers, floats and booleans are suported for the moment")

    def _get_table_name_from_schema(self, schema: Schema):

        if self.registered_tables.get(schema.__name__):
            raise Exception(f"Schema '{schema.__name__}' already registered")

        table_name = None
        if hasattr(schema, 'Meta'):
            if hasattr(schema.Meta, '__tablename__'):
                table_name = schema.Meta.__tablename__

        if not table_name:
            table_name = schema.__name__.replace("Schema", "").lower()

        return table_name

    def _get_columns(self, schema: Schema):

        sa_cols = []
        for field_name, temp_field_type in schema().declared_fields.items():
            field_type = self._sa_field(temp_field_type)
            sa_cols.append(sa.Column(field_name, field_type))
        return sa_cols

    def _get_table_from_schema(self, schema: Schema):
        """ Convert a marshmallow schema to a sqlalchemy model """

        table_name = self._get_table_name_from_schema(schema)
        columns = self._get_columns(schema)

        Table = sa.Table(
            table_name,
            self.Base.metadata,
            sa.Column('id', sa.Integer, primary_key=True, autoincrement=True),
            *columns,
            extend_existing=True,
        )

        Model = type(table_name, (object,), {'__tablename__': table_name})

        mapper(Model, Table)

        return Model

    def has_table(self, table_name: str):

        has_table = (
            sa.inspect(self.engine).dialect
                .has_table(self.engine.connect(), table_name)
        )

        return has_table

    def execute(self, sql_query: str, commit: bool = False) -> CursorResult:
        """
            Execute SQLAlchemy on db queries like:
            sql_query = select(UsersTable).where(UsersTable.name == 'John')
            
            Returns a cursor from which you can get results.
            
        """

        cursor: CursorResult = self.session \
            .execute(sa.text(sql_query) if isinstance(sql_query, str) else sql_query)

        if commit: self.session.commit()

        return cursor

    def insert(self, schema: Schema, data: Union[dict, List[dict]]) -> CursorResult:
        """ Insert dict or list of dict to db """

        assert isinstance(data, dict) or isinstance(data, list)
        if isinstance(data, dict): data = [data]

        if isinstance(schema, tuple):
            table = schema[0]
            schema[1](many=True).load(data)  # marshmallow validate
        elif hasattr(schema, '__tablename__'):
            table = schema  # it's an SQLAlchemy Model
        else:
            schema(many=True).load(data)  # marshmallow validate
            table = self.registered_tables[schema.__name__]

        sql_query = sa.insert(table).values(data).returning(sa.text('id'))
        cursor = self.execute(sql_query, commit=True)

        return cursor.all()

    def insert_one(self, schema: Schema, data: dict = None, **data_kwargs) -> CursorResult:
        """ Insert dict to db """
        return self.insert(schema, data or data_kwargs)

    def insert_with_id(self, schema: Schema, id: Union[str, int], data: dict = None, **data_kwargs) -> CursorResult:
        """ Insert dict with a specific 'id' """
        raise NotImplementedError

    def insert_many(self, schema: Schema, data: List[dict], validate_percentage: float = 1.0) -> CursorResult:
        """
            TODO `validate_percentage`: what percentage of the list of data provided should be validated (0.5 is 50%)
        """
        return self.insert(schema, data)

    # Updating existing data

    def count(self, schema_name: str, sql_query: any = None, **filters) -> int:

        table = self.registered_tables[schema_name]

        if not sql_query:
            filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
            sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

        cursor = self.execute(sql_query)
        counted_items = len(cursor.all())

        # TODO not working
        # counted_items = self.session.query(sa.select([sa.func.count()]).select_from(sql_query)).count()

        return counted_items

    def _validate_on_update(self, schema_name: str, data: Union[dict, List[dict]]):

        try:
            # marshmallow validate
            schema = self.registered_schemas[schema_name]

            schema(many=True).load(data) if isinstance(data, list) else schema().load(data)

        except ValidationError as Errors:

            # When updating we are only interested if the field to be updated is valid

            errors_dict = Errors.normalized_messages()

            if isinstance(data, list):
                for d in data:
                    field_found = set.intersection(set(errors_dict.keys()), set(d.keys()))
                    if field_found: raise Errors
            else:
                field_found = set.intersection(set(errors_dict.keys()), set(data.keys()))
                if field_found: raise Errors

    def update(self, schema_name: str, filters: dict, data: Union[dict, List[dict]],
               first: bool = False) -> CursorResult:
        """ """

        self._validate_on_update(schema_name, data)

        table = self.registered_tables[schema_name]

        counted_items = self.count(schema_name, **filters)
        if first is True:
            if counted_items != 1:
                raise ValueError("Can't update one item with given filters")

        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.update(table).where(eval(filters_string)).values(**data)

        cursor = self.execute(sql_query, commit=True)

        return cursor

    def update_one(self, schema_name: str, filters: dict, data: dict) -> CursorResult:
        """ Update one item """
        return self.update(schema_name, filters=filters, data=data, first=True)

    def update_on_id(self, schema_name: str, id: Union[str, int], data: dict) -> CursorResult:
        """ Update one item using the id """
        return self.update(schema_name, filters={'id': id}, data=data, first=True)

    def update_many(self, schema_name: str, filters: dict, data: List[dict],
                    validate_percentage: float = 1.0) -> CursorResult:
        """
            Update many
            TODO `validate_percentage`: what percentage of the list of data provided should be validated (0.5 is 50% of items in list)
        """
        return self.update(schema_name, filters=filters, data=data)

    # Deleting existing data

    def delete(self, schema_name: str, filters: dict, first: bool = False) -> CursorResult:
        """ Delete items """

        table = self.registered_tables[schema_name]

        counted_items = self.count(schema_name, **filters)
        if first is True:
            if counted_items != 1:
                raise ValueError("Can't delete one item with given filters")

        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.delete(table).where(eval(filters_string)) if filters_string else sa.delete(table)

        cursor = self.execute(sql_query, commit=True)

        return cursor

    def delete_one(self, schema_name: str, **filters) -> CursorResult:
        """ Delete one item """
        return self.delete(schema_name, filters, first=True)

    def delete_by_id(self, schema_name: str, id: str) -> CursorResult:
        """ Delete one item using the id provided """
        return self.delete(schema_name, filters={'id': id}, first=True)

    def delete_many(self, schema_name: str, **filters) -> CursorResult:
        """ Delete items that match filters """
        return self.delete(schema_name, filters)

Ancestors

Methods

def count(self, schema_name: str, sql_query:  = None, **filters) ‑> int
Expand source code
def count(self, schema_name: str, sql_query: any = None, **filters) -> int:

    table = self.registered_tables[schema_name]

    if not sql_query:
        filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
        sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

    cursor = self.execute(sql_query)
    counted_items = len(cursor.all())

    # TODO not working
    # counted_items = self.session.query(sa.select([sa.func.count()]).select_from(sql_query)).count()

    return counted_items
def create_all(self)

Create connection to db, create all tables and generate migration files

Expand source code
def create_all(self):
    """ Create connection to db, create all tables and generate migration files """

    self.Base.metadata.create_all()
    migrations = PostgresMigrations(self.db_url, target_metadata=self.Base.metadata)
    migrations.make_migrations()
def delete(self, schema_name: str, filters: dict, first: bool = False) ‑> sqlalchemy.engine.cursor.CursorResult

Delete items

Expand source code
def delete(self, schema_name: str, filters: dict, first: bool = False) -> CursorResult:
    """ Delete items """

    table = self.registered_tables[schema_name]

    counted_items = self.count(schema_name, **filters)
    if first is True:
        if counted_items != 1:
            raise ValueError("Can't delete one item with given filters")

    filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
    sql_query = sa.delete(table).where(eval(filters_string)) if filters_string else sa.delete(table)

    cursor = self.execute(sql_query, commit=True)

    return cursor
def delete_by_id(self, schema_name: str, id: str) ‑> sqlalchemy.engine.cursor.CursorResult

Delete one item using the id provided

Expand source code
def delete_by_id(self, schema_name: str, id: str) -> CursorResult:
    """ Delete one item using the id provided """
    return self.delete(schema_name, filters={'id': id}, first=True)
def delete_many(self, schema_name: str, **filters) ‑> sqlalchemy.engine.cursor.CursorResult

Delete items that match filters

Expand source code
def delete_many(self, schema_name: str, **filters) -> CursorResult:
    """ Delete items that match filters """
    return self.delete(schema_name, filters)
def delete_one(self, schema_name: str, **filters) ‑> sqlalchemy.engine.cursor.CursorResult

Delete one item

Expand source code
def delete_one(self, schema_name: str, **filters) -> CursorResult:
    """ Delete one item """
    return self.delete(schema_name, filters, first=True)
def deserialize(self, schema_name: str, cursor_data: )
Expand source code
def deserialize(self, schema_name: str, cursor_data: any):

    if cursor_data is None: return [{}]

    schema = self.registered_schemas[schema_name]

    datali = []
    for d in cursor_data:

        data_dict = {}

        # Getting the id
        data_dict['id'] = self._deserialize_value(d, field_name='id', idx=0)

        for idx, field_name in enumerate(schema().declared_fields.keys()):
            data_dict[field_name] = self._deserialize_value(d, field_name, idx + 1)

        datali.append(data_dict)

    return datali
def execute(self, sql_query: str, commit: bool = False) ‑> sqlalchemy.engine.cursor.CursorResult

Execute SQLAlchemy on db queries like: sql_query = select(UsersTable).where(UsersTable.name == 'John')

Returns a cursor from which you can get results.

Expand source code
def execute(self, sql_query: str, commit: bool = False) -> CursorResult:
    """
        Execute SQLAlchemy on db queries like:
        sql_query = select(UsersTable).where(UsersTable.name == 'John')
        
        Returns a cursor from which you can get results.
        
    """

    cursor: CursorResult = self.session \
        .execute(sa.text(sql_query) if isinstance(sql_query, str) else sql_query)

    if commit: self.session.commit()

    return cursor
def fetch(self, schema_name: str, id: str = None, first: bool = False, limit: int = None, skip: int = None, **filters) ‑> List[dict]

TODO limit and skip

Expand source code
def fetch(self, schema_name: str, id: str = None, first: bool = False, limit: int = None, skip: int = None,
          **filters) -> List[dict]:
    """ 
    TODO limit and skip
    """

    first = first or ('id' in filters and len(filters) == 1)

    table = self.registered_tables[schema_name]
    filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
    sql_query = sa.select(table).where(eval(filters_string)) if filters_string else sa.select(table)

    cursor = self.execute(sql_query)

    cursor_data = cursor.first() if first else cursor.all()
    deserialized_data = self.deserialize(schema_name, cursor_data)

    return deserialized_data[0] if first else deserialized_data
def has_table(self, table_name: str)
Expand source code
def has_table(self, table_name: str):

    has_table = (
        sa.inspect(self.engine).dialect
            .has_table(self.engine.connect(), table_name)
    )

    return has_table
def insert(self, schema: marshmallow.schema.Schema, data: Union[dict, List[dict]]) ‑> sqlalchemy.engine.cursor.CursorResult

Insert dict or list of dict to db

Expand source code
def insert(self, schema: Schema, data: Union[dict, List[dict]]) -> CursorResult:
    """ Insert dict or list of dict to db """

    assert isinstance(data, dict) or isinstance(data, list)
    if isinstance(data, dict): data = [data]

    if isinstance(schema, tuple):
        table = schema[0]
        schema[1](many=True).load(data)  # marshmallow validate
    elif hasattr(schema, '__tablename__'):
        table = schema  # it's an SQLAlchemy Model
    else:
        schema(many=True).load(data)  # marshmallow validate
        table = self.registered_tables[schema.__name__]

    sql_query = sa.insert(table).values(data).returning(sa.text('id'))
    cursor = self.execute(sql_query, commit=True)

    return cursor.all()
def register_schema(self, schema: marshmallow.schema.Schema) ‑> str

Register schema

Expand source code
def register_schema(self, schema: Schema) -> str:
    """ Register schema """

    table = self._get_table_from_schema(schema)
    self.registered_tables[schema.__name__] = table
    self.registered_schemas[schema.__name__] = schema

    if not self.has_table(table.__tablename__):
        self.create_all()  # create table if doesn't exist

    return table.__tablename__
def update(self, schema_name: str, filters: dict, data: Union[dict, List[dict]], first: bool = False) ‑> sqlalchemy.engine.cursor.CursorResult
Expand source code
def update(self, schema_name: str, filters: dict, data: Union[dict, List[dict]],
           first: bool = False) -> CursorResult:
    """ """

    self._validate_on_update(schema_name, data)

    table = self.registered_tables[schema_name]

    counted_items = self.count(schema_name, **filters)
    if first is True:
        if counted_items != 1:
            raise ValueError("Can't update one item with given filters")

    filters_string = ", ".join([f"table.{field} == filters['{field}']" for field in filters])
    sql_query = sa.update(table).where(eval(filters_string)).values(**data)

    cursor = self.execute(sql_query, commit=True)

    return cursor
def update_many(self, schema_name: str, filters: dict, data: List[dict], validate_percentage: float = 1.0) ‑> sqlalchemy.engine.cursor.CursorResult

Update many TODO validate_percentage: what percentage of the list of data provided should be validated (0.5 is 50% of items in list)

Expand source code
def update_many(self, schema_name: str, filters: dict, data: List[dict],
                validate_percentage: float = 1.0) -> CursorResult:
    """
        Update many
        TODO `validate_percentage`: what percentage of the list of data provided should be validated (0.5 is 50% of items in list)
    """
    return self.update(schema_name, filters=filters, data=data)
def update_on_id(self, schema_name: str, id: Union[str, int], data: dict) ‑> sqlalchemy.engine.cursor.CursorResult

Update one item using the id

Expand source code
def update_on_id(self, schema_name: str, id: Union[str, int], data: dict) -> CursorResult:
    """ Update one item using the id """
    return self.update(schema_name, filters={'id': id}, data=data, first=True)
def update_one(self, schema_name: str, filters: dict, data: dict) ‑> sqlalchemy.engine.cursor.CursorResult

Update one item

Expand source code
def update_one(self, schema_name: str, filters: dict, data: dict) -> CursorResult:
    """ Update one item """
    return self.update(schema_name, filters=filters, data=data, first=True)

Inherited members