Module licenseware.test_helpers.auth
The AuthHelper
class can be used to obtain authorization headers from auth service + other interactions with auth.
from licenseware.test_helpers.auth import AuthHelper
Here we provide an email and call the get_auth_headers
method
which will return the auth headers (TenantId and Authorization) needed for making a request auth
auth_headers = AuthHelper(main_admin).get_auth_headers()
Here we are accepting the invitation on a tenant_id(project)
for email in [admin_email1, user_email1]:
response = AuthHelper(email).accept_invite_on_tenant(auth_headers['TenantId'])
assert response
We can set a default tenant/project
auth_headers = AuthHelper(email, default_tenant="1234").get_auth_header()
This way the default_tenant
will be used for the request
Expand source code
"""
The `AuthHelper` class can be used to obtain authorization headers from auth service + other interactions with auth.
```py
from licenseware.test_helpers.auth import AuthHelper
```
Here we provide an email and call the `get_auth_headers` method
which will return the auth headers (TenantId and Authorization) needed for making a request auth
```py
auth_headers = AuthHelper(main_admin).get_auth_headers()
```
Here we are accepting the invitation on a tenant_id(project)
```py
for email in [admin_email1, user_email1]:
response = AuthHelper(email).accept_invite_on_tenant(auth_headers['TenantId'])
assert response
```
We can set a default tenant/project
```py
auth_headers = AuthHelper(email, default_tenant="1234").get_auth_header()
```
This way the `default_tenant` will be used for the request
"""
import requests
from licenseware.utils.logger import log
from licenseware.common.constants import envs
from licenseware.decorators.auth_decorators import authenticated_machine
class AuthHelper:
def __init__(self, email: str, password: str = "secret", default_tenant: str = None):
self.email = email
self.password = password
self.default_tenant = default_tenant
self.auth_headers = self.get_auth_headers()
@authenticated_machine
def get_auth_headers(self):
"""
Get required auth headers needed to make a request as a user
"""
login_response = self.login_user(self.email, self.password)
if login_response['status'] == 'success':
login_response.pop('status')
login_response.pop('message')
log.success(login_response)
if self.default_tenant is not None:
login_response['TenantId'] = self.default_tenant
return login_response
else:
self.create_user(self.email, self.password)
return self.get_auth_headers()
def accept_invite_on_tenant(self, tenant_id: str):
log.debug(f"Getting invite token for '{self.email}' on tenant '{tenant_id}'")
invite_token = self.get_invite_token(tenant_id)
if invite_token is None:
raise Exception("No invite token found")
log.debug(f"Logging '{self.email}' with invite token: '{invite_token}'")
response = requests.post(
envs.AUTH_SERVICE_URL + '/login',
json={"email": self.email, "password": self.password},
params={'invite_token': invite_token}
)
log.warning(f"Response from logging with invite token: {response.content}")
return response.json()
def get_invite_token(self, tenant_id: str):
shared_tenants = self.get_auth_tables('shared_tenants')
for st in shared_tenants:
if st['invited_email'] == self.email and st['tenant_id'] == tenant_id:
log.success(f"Found invite token '{st['invite_token']}'")
return st['invite_token']
def get_auth_tables(self, table: str = None):
"""
Tables: users, tenants, shared_tenants
"""
response = requests.get(
envs.AUTH_SERVICE_URL + '/users/tables',
headers=self.auth_headers
)
log.debug(f"User's tables: {response.json()}")
if table:
return response.json()[table]
return response.json()
@staticmethod
def login_user(email: str, password: str):
response = requests.post(
envs.AUTH_SERVICE_URL + '/login',
json={"email": email, "password": password}
)
# log.debug(response.content)
return response.json()
@staticmethod
def create_user(
email: str,
password: str,
first_name: str = None,
last_name: str = None,
company_name: str = None,
job_title: str = None
):
payload = {
"email": email,
"password": password,
"first_name": first_name or email.split("@")[0],
"last_name": last_name or "",
"company_name": company_name or email.split("@")[1].split('.')[0],
"job_title": job_title or ""
}
response = requests.post(
envs.AUTH_SERVICE_URL + '/users/register',
json=payload
)
# log.debug(response)
return response.json()
Classes
class AuthHelper (email: str, password: str = 'secret', default_tenant: str = None)
-
Expand source code
class AuthHelper: def __init__(self, email: str, password: str = "secret", default_tenant: str = None): self.email = email self.password = password self.default_tenant = default_tenant self.auth_headers = self.get_auth_headers() @authenticated_machine def get_auth_headers(self): """ Get required auth headers needed to make a request as a user """ login_response = self.login_user(self.email, self.password) if login_response['status'] == 'success': login_response.pop('status') login_response.pop('message') log.success(login_response) if self.default_tenant is not None: login_response['TenantId'] = self.default_tenant return login_response else: self.create_user(self.email, self.password) return self.get_auth_headers() def accept_invite_on_tenant(self, tenant_id: str): log.debug(f"Getting invite token for '{self.email}' on tenant '{tenant_id}'") invite_token = self.get_invite_token(tenant_id) if invite_token is None: raise Exception("No invite token found") log.debug(f"Logging '{self.email}' with invite token: '{invite_token}'") response = requests.post( envs.AUTH_SERVICE_URL + '/login', json={"email": self.email, "password": self.password}, params={'invite_token': invite_token} ) log.warning(f"Response from logging with invite token: {response.content}") return response.json() def get_invite_token(self, tenant_id: str): shared_tenants = self.get_auth_tables('shared_tenants') for st in shared_tenants: if st['invited_email'] == self.email and st['tenant_id'] == tenant_id: log.success(f"Found invite token '{st['invite_token']}'") return st['invite_token'] def get_auth_tables(self, table: str = None): """ Tables: users, tenants, shared_tenants """ response = requests.get( envs.AUTH_SERVICE_URL + '/users/tables', headers=self.auth_headers ) log.debug(f"User's tables: {response.json()}") if table: return response.json()[table] return response.json() @staticmethod def login_user(email: str, password: str): response = requests.post( envs.AUTH_SERVICE_URL + '/login', json={"email": email, "password": password} ) # log.debug(response.content) return response.json() @staticmethod def create_user( email: str, password: str, first_name: str = None, last_name: str = None, company_name: str = None, job_title: str = None ): payload = { "email": email, "password": password, "first_name": first_name or email.split("@")[0], "last_name": last_name or "", "company_name": company_name or email.split("@")[1].split('.')[0], "job_title": job_title or "" } response = requests.post( envs.AUTH_SERVICE_URL + '/users/register', json=payload ) # log.debug(response) return response.json()
Static methods
def create_user(email: str, password: str, first_name: str = None, last_name: str = None, company_name: str = None, job_title: str = None)
-
Expand source code
@staticmethod def create_user( email: str, password: str, first_name: str = None, last_name: str = None, company_name: str = None, job_title: str = None ): payload = { "email": email, "password": password, "first_name": first_name or email.split("@")[0], "last_name": last_name or "", "company_name": company_name or email.split("@")[1].split('.')[0], "job_title": job_title or "" } response = requests.post( envs.AUTH_SERVICE_URL + '/users/register', json=payload ) # log.debug(response) return response.json()
def login_user(email: str, password: str)
-
Expand source code
@staticmethod def login_user(email: str, password: str): response = requests.post( envs.AUTH_SERVICE_URL + '/login', json={"email": email, "password": password} ) # log.debug(response.content) return response.json()
Methods
def accept_invite_on_tenant(self, tenant_id: str)
-
Expand source code
def accept_invite_on_tenant(self, tenant_id: str): log.debug(f"Getting invite token for '{self.email}' on tenant '{tenant_id}'") invite_token = self.get_invite_token(tenant_id) if invite_token is None: raise Exception("No invite token found") log.debug(f"Logging '{self.email}' with invite token: '{invite_token}'") response = requests.post( envs.AUTH_SERVICE_URL + '/login', json={"email": self.email, "password": self.password}, params={'invite_token': invite_token} ) log.warning(f"Response from logging with invite token: {response.content}") return response.json()
def get_auth_headers(self)
-
Get required auth headers needed to make a request as a user
Expand source code
@authenticated_machine def get_auth_headers(self): """ Get required auth headers needed to make a request as a user """ login_response = self.login_user(self.email, self.password) if login_response['status'] == 'success': login_response.pop('status') login_response.pop('message') log.success(login_response) if self.default_tenant is not None: login_response['TenantId'] = self.default_tenant return login_response else: self.create_user(self.email, self.password) return self.get_auth_headers()
def get_auth_tables(self, table: str = None)
-
Tables: users, tenants, shared_tenants
Expand source code
def get_auth_tables(self, table: str = None): """ Tables: users, tenants, shared_tenants """ response = requests.get( envs.AUTH_SERVICE_URL + '/users/tables', headers=self.auth_headers ) log.debug(f"User's tables: {response.json()}") if table: return response.json()[table] return response.json()
def get_invite_token(self, tenant_id: str)
-
Expand source code
def get_invite_token(self, tenant_id: str): shared_tenants = self.get_auth_tables('shared_tenants') for st in shared_tenants: if st['invited_email'] == self.email and st['tenant_id'] == tenant_id: log.success(f"Found invite token '{st['invite_token']}'") return st['invite_token']