Module licenseware.utils.flask_dramatiq

This is just a refactor of flask-dramatiq https://flask-dramatiq.readthedocs.io/en/latest/

Customized to be used with Redis database.

Usage:


from licenseware.utils.flask_dramatiq import Dramatiq

broker = Dramatiq(
    host=os.getenv('REDIS_HOST'),
    port=os.getenv('REDIS_PORT'),
    db=os.getenv('REDIS_DB'),
    password=os.getenv('REDIS_PASSWORD')
)


You can use broker object to decorate function workers The broker instantiation is already done by the licenseware sdk

You can use it like bellow


from licenseware.utils.dramatiq_redis_broker import broker


@broker.actor()
def worker_func(event):
    return "some data"

The broker is used in UploaderBuilder to decorate worker_functions. The AppBuilder class will wrap the worker with the flask app context.

Background worker can be started using the default dramatiq CLI or via sdk


dramatiq main:App.broker -p4 --watch ./ --queues odb

Using the licenseware CLI if DEBUG is found true in evironment variables it will start with the --watch flag.


licenseware start-background-worker

Expand source code
"""

This is just a refactor of flask-dramatiq https://flask-dramatiq.readthedocs.io/en/latest/

Customized to be used with Redis database.


Usage:

```py

from licenseware.utils.flask_dramatiq import Dramatiq

broker = Dramatiq(
    host=os.getenv('REDIS_HOST'),
    port=os.getenv('REDIS_PORT'),
    db=os.getenv('REDIS_DB'),
    password=os.getenv('REDIS_PASSWORD')
)


```

You can use `broker` object to decorate function workers
The broker instantiation is already done by the licenseware sdk

You can use it like bellow

```py

from licenseware.utils.dramatiq_redis_broker import broker


@broker.actor()
def worker_func(event):
    return "some data"
    
```

The broker is used in `UploaderBuilder` to decorate `worker_functions`.
The `AppBuilder` class will wrap the worker with the flask app context.

Background worker can be started using the default dramatiq CLI or via sdk

```bash

dramatiq main:App.broker -p4 --watch ./ --queues odb

```

Using the licenseware CLI if DEBUG is found true in evironment variables it will start with the `--watch` flag.

```bash

licenseware start-background-worker

```



"""

from flask import Flask
from threading import local
from dramatiq import set_broker
from dramatiq import Middleware
from dramatiq import actor as register_actor
from dramatiq.middleware import default_middleware
from dramatiq.brokers.redis import RedisBroker
from licenseware.common.constants import envs

from .logger import log

# PREPS

class AppContextMiddleware(Middleware):
    # Setup Flask app for actor. Borrowed from
    # https://github.com/Bogdanp/flask_dramatiq_example.

    state = local()

    def __init__(self, app):
        self.app = app

    def before_process_message(self, broker, message):
        context = self.app.app_context()
        context.push()

        self.state.context = context

    def after_process_message(
            self, broker, message, *, result=None, exception=None):
        try:
            context = self.state.context
            context.pop(exception)
            del self.state.context
        except AttributeError:
            pass

    after_skip_message = after_process_message



class LazyActor(object):
    # Intermediate object that register actor on broker an call.

    def __init__(self, extension, fn, kw):
        self.extension = extension
        self.fn = fn
        self.kw = kw
        self.actor = None

    def __call__(self, *a, **kw):
        return self.fn(*a, **kw)

    def __repr__(self):
        return '<%s %s.%s>' % (
            self.__class__.__name__,
            self.fn.__module__, self.fn.__name__,
        )

    def __getattr__(self, name):
        if not self.actor:
            raise AttributeError(name)
        return getattr(self.actor, name)

    def register(self, broker):
        self.actor = register_actor(broker=broker, **self.kw)(self.fn)

    # Next is regular actor API.

    def send(self, *a, **kw):
        if envs.USE_BACKGROUND_WORKER:
            return self.actor.send(*a, **kw)
        return self.actor(*a, **kw)


    def send_with_options(self, *a, **kw):
        if envs.USE_BACKGROUND_WORKER:
            return self.actor.send_with_options(*a, **kw)
        return self.actor(*a, **kw)


# BROKER    

class Dramatiq:
    
    def __init__(
        self, 
        *,  
        app:Flask = None, 
        host:str = None, 
        port:int = None, 
        db:int = 0,
        password:str = None,
        middleware: list = None
    ):
        
        self.app = None
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.actors = []
        
        #Removing prometheus middleware
        default_middleware.pop(0)
        if middleware is None: middleware = [m() for m in default_middleware]
        self.middleware = middleware

        if app: self.init_app(app)
        
        
    def add_middleware(self, mdw):
        self.middleware.append(mdw)
        
        
    def add_app_to_self(self, app: Flask):
        if self.app is not None: 
            raise Exception("Flask 'app' can be provided only on 'init_app' or 'Dramatiq' class instantiation")
        self.app = app
        
        
    def init_app(self, app:Flask):
        self.add_app_to_self(app)
        
        middleware = [AppContextMiddleware(self.app)] + self.middleware
        
        self.broker = RedisBroker(
            host=self.host,
            port=self.port,
            db=self.db,
            password=self.password,
            middleware=middleware
        )
            
        for actor in self.actors:
            actor.register(broker=self.broker)
            
        set_broker(self.broker)
        
        self.show_registered_actors()
        
        return self.broker
 
 
    def show_registered_actors(self):
        
        registered_actors = []    
        for uploader_id, actor_obj in self.broker.actors.items():
            registered_actors.append(f"{uploader_id}/{actor_obj.queue_name}")
            
        log.info("-------- Dramatiq Actors: " + " : ".join(registered_actors) + "---------")
        
    
    def actor(self, fn=None, **kw):

        def decorator(fn):
            lazy_actor = LazyActor(self, fn, kw)
            self.actors.append(lazy_actor)
            if self.app:
                lazy_actor.register(self.broker)
            return lazy_actor

        if fn: return decorator(fn)
        return decorator

Classes

class AppContextMiddleware (app)

Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.

Expand source code
class AppContextMiddleware(Middleware):
    # Setup Flask app for actor. Borrowed from
    # https://github.com/Bogdanp/flask_dramatiq_example.

    state = local()

    def __init__(self, app):
        self.app = app

    def before_process_message(self, broker, message):
        context = self.app.app_context()
        context.push()

        self.state.context = context

    def after_process_message(
            self, broker, message, *, result=None, exception=None):
        try:
            context = self.state.context
            context.pop(exception)
            del self.state.context
        except AttributeError:
            pass

    after_skip_message = after_process_message

Ancestors

  • dramatiq.middleware.middleware.Middleware

Class variables

var state

Methods

def after_process_message(self, broker, message, *, result=None, exception=None)

Called after a message has been processed.

Expand source code
def after_process_message(
        self, broker, message, *, result=None, exception=None):
    try:
        context = self.state.context
        context.pop(exception)
        del self.state.context
    except AttributeError:
        pass
def after_skip_message(self, broker, message, *, result=None, exception=None)

Called after a message has been processed.

Expand source code
def after_process_message(
        self, broker, message, *, result=None, exception=None):
    try:
        context = self.state.context
        context.pop(exception)
        del self.state.context
    except AttributeError:
        pass
def before_process_message(self, broker, message)

Called before a message is processed.

Raises

SkipMessage
If the current message should be skipped. When this is raised, after_skip_message is emitted instead of after_process_message.
Expand source code
def before_process_message(self, broker, message):
    context = self.app.app_context()
    context.push()

    self.state.context = context
class Dramatiq (*, app: flask.app.Flask = None, host: str = None, port: int = None, db: int = 0, password: str = None, middleware: list = None)
Expand source code
class Dramatiq:
    
    def __init__(
        self, 
        *,  
        app:Flask = None, 
        host:str = None, 
        port:int = None, 
        db:int = 0,
        password:str = None,
        middleware: list = None
    ):
        
        self.app = None
        self.host = host
        self.port = port
        self.db = db
        self.password = password
        self.actors = []
        
        #Removing prometheus middleware
        default_middleware.pop(0)
        if middleware is None: middleware = [m() for m in default_middleware]
        self.middleware = middleware

        if app: self.init_app(app)
        
        
    def add_middleware(self, mdw):
        self.middleware.append(mdw)
        
        
    def add_app_to_self(self, app: Flask):
        if self.app is not None: 
            raise Exception("Flask 'app' can be provided only on 'init_app' or 'Dramatiq' class instantiation")
        self.app = app
        
        
    def init_app(self, app:Flask):
        self.add_app_to_self(app)
        
        middleware = [AppContextMiddleware(self.app)] + self.middleware
        
        self.broker = RedisBroker(
            host=self.host,
            port=self.port,
            db=self.db,
            password=self.password,
            middleware=middleware
        )
            
        for actor in self.actors:
            actor.register(broker=self.broker)
            
        set_broker(self.broker)
        
        self.show_registered_actors()
        
        return self.broker
 
 
    def show_registered_actors(self):
        
        registered_actors = []    
        for uploader_id, actor_obj in self.broker.actors.items():
            registered_actors.append(f"{uploader_id}/{actor_obj.queue_name}")
            
        log.info("-------- Dramatiq Actors: " + " : ".join(registered_actors) + "---------")
        
    
    def actor(self, fn=None, **kw):

        def decorator(fn):
            lazy_actor = LazyActor(self, fn, kw)
            self.actors.append(lazy_actor)
            if self.app:
                lazy_actor.register(self.broker)
            return lazy_actor

        if fn: return decorator(fn)
        return decorator

Methods

def actor(self, fn=None, **kw)
Expand source code
def actor(self, fn=None, **kw):

    def decorator(fn):
        lazy_actor = LazyActor(self, fn, kw)
        self.actors.append(lazy_actor)
        if self.app:
            lazy_actor.register(self.broker)
        return lazy_actor

    if fn: return decorator(fn)
    return decorator
def add_app_to_self(self, app: flask.app.Flask)
Expand source code
def add_app_to_self(self, app: Flask):
    if self.app is not None: 
        raise Exception("Flask 'app' can be provided only on 'init_app' or 'Dramatiq' class instantiation")
    self.app = app
def add_middleware(self, mdw)
Expand source code
def add_middleware(self, mdw):
    self.middleware.append(mdw)
def init_app(self, app: flask.app.Flask)
Expand source code
def init_app(self, app:Flask):
    self.add_app_to_self(app)
    
    middleware = [AppContextMiddleware(self.app)] + self.middleware
    
    self.broker = RedisBroker(
        host=self.host,
        port=self.port,
        db=self.db,
        password=self.password,
        middleware=middleware
    )
        
    for actor in self.actors:
        actor.register(broker=self.broker)
        
    set_broker(self.broker)
    
    self.show_registered_actors()
    
    return self.broker
def show_registered_actors(self)
Expand source code
def show_registered_actors(self):
    
    registered_actors = []    
    for uploader_id, actor_obj in self.broker.actors.items():
        registered_actors.append(f"{uploader_id}/{actor_obj.queue_name}")
        
    log.info("-------- Dramatiq Actors: " + " : ".join(registered_actors) + "---------")
class LazyActor (extension, fn, kw)
Expand source code
class LazyActor(object):
    # Intermediate object that register actor on broker an call.

    def __init__(self, extension, fn, kw):
        self.extension = extension
        self.fn = fn
        self.kw = kw
        self.actor = None

    def __call__(self, *a, **kw):
        return self.fn(*a, **kw)

    def __repr__(self):
        return '<%s %s.%s>' % (
            self.__class__.__name__,
            self.fn.__module__, self.fn.__name__,
        )

    def __getattr__(self, name):
        if not self.actor:
            raise AttributeError(name)
        return getattr(self.actor, name)

    def register(self, broker):
        self.actor = register_actor(broker=broker, **self.kw)(self.fn)

    # Next is regular actor API.

    def send(self, *a, **kw):
        if envs.USE_BACKGROUND_WORKER:
            return self.actor.send(*a, **kw)
        return self.actor(*a, **kw)


    def send_with_options(self, *a, **kw):
        if envs.USE_BACKGROUND_WORKER:
            return self.actor.send_with_options(*a, **kw)
        return self.actor(*a, **kw)

Methods

def register(self, broker)
Expand source code
def register(self, broker):
    self.actor = register_actor(broker=broker, **self.kw)(self.fn)
def send(self, *a, **kw)
Expand source code
def send(self, *a, **kw):
    if envs.USE_BACKGROUND_WORKER:
        return self.actor.send(*a, **kw)
    return self.actor(*a, **kw)
def send_with_options(self, *a, **kw)
Expand source code
def send_with_options(self, *a, **kw):
    if envs.USE_BACKGROUND_WORKER:
        return self.actor.send_with_options(*a, **kw)
    return self.actor(*a, **kw)