Module licenseware.dependencies.redis_cache

Expand source code
from datetime import timedelta
from typing import List

from licenseware.common.constants import envs
from redis import Redis


class RedisCache:
    def __init__(
        self, host: str = None, port: int = None, db: int = None, password: str = None
    ):
        self.redis = Redis(
            host=host or envs.REDIS_HOST,
            port=port or envs.REDIS_PORT,
            db=db or envs.REDIS_RESULT_CACHE_DB,
            password=password or envs.REDIS_PASSWORD,
        )

    def get(self, key: str) -> str:
        return self.redis.get(key)

    def set(self, key: str, value: bytes, expiry: int) -> bool:
        assert isinstance(value, bytes), "value must be bytes"
        return self.redis.set(key, value, ex=timedelta(seconds=expiry))

    def sadd(self, key: str, value: str) -> bool:
        return self.redis.sadd(key, value)

    def smembers(self, key: str) -> set:
        return self.redis.smembers(key)

    def delete(self, *keys: List[str]) -> bool:
        return self.redis.delete(*keys)

Classes

class RedisCache (host: str = None, port: int = None, db: int = None, password: str = None)
Expand source code
class RedisCache:
    def __init__(
        self, host: str = None, port: int = None, db: int = None, password: str = None
    ):
        self.redis = Redis(
            host=host or envs.REDIS_HOST,
            port=port or envs.REDIS_PORT,
            db=db or envs.REDIS_RESULT_CACHE_DB,
            password=password or envs.REDIS_PASSWORD,
        )

    def get(self, key: str) -> str:
        return self.redis.get(key)

    def set(self, key: str, value: bytes, expiry: int) -> bool:
        assert isinstance(value, bytes), "value must be bytes"
        return self.redis.set(key, value, ex=timedelta(seconds=expiry))

    def sadd(self, key: str, value: str) -> bool:
        return self.redis.sadd(key, value)

    def smembers(self, key: str) -> set:
        return self.redis.smembers(key)

    def delete(self, *keys: List[str]) -> bool:
        return self.redis.delete(*keys)

Methods

def delete(self, *keys: List[str]) ‑> bool
Expand source code
def delete(self, *keys: List[str]) -> bool:
    return self.redis.delete(*keys)
def get(self, key: str) ‑> str
Expand source code
def get(self, key: str) -> str:
    return self.redis.get(key)
def sadd(self, key: str, value: str) ‑> bool
Expand source code
def sadd(self, key: str, value: str) -> bool:
    return self.redis.sadd(key, value)
def set(self, key: str, value: bytes, expiry: int) ‑> bool
Expand source code
def set(self, key: str, value: bytes, expiry: int) -> bool:
    assert isinstance(value, bytes), "value must be bytes"
    return self.redis.set(key, value, ex=timedelta(seconds=expiry))
def smembers(self, key: str) ‑> RedisCache.set() at 0x7fb5fa5c53a0>
Expand source code
def smembers(self, key: str) -> set:
    return self.redis.smembers(key)