156 lines
5.0 KiB
Python
156 lines
5.0 KiB
Python
from __future__ import annotations
|
|
|
|
from django.contrib.auth.models import Group, Permission
|
|
from guardian.shortcuts import assign_perm
|
|
from ninja.errors import HttpError
|
|
|
|
ROLE_ADMIN = "administrator"
|
|
ROLE_OPERATOR = "operator"
|
|
ROLE_AUDITOR = "auditor"
|
|
ROLE_USER = "user"
|
|
|
|
ROLE_ORDER = (ROLE_ADMIN, ROLE_OPERATOR, ROLE_AUDITOR, ROLE_USER)
|
|
ROLE_ALL = ROLE_ORDER
|
|
ROLE_ALIASES = {"admin": ROLE_ADMIN}
|
|
ROLE_INPUTS = tuple(sorted(set(ROLE_ORDER) | set(ROLE_ALIASES.keys())))
|
|
|
|
def _model_perms(app_label: str, model: str, actions: list[str]) -> list[str]:
|
|
return [f"{app_label}.{action}_{model}" for action in actions]
|
|
|
|
|
|
ROLE_PERMISSIONS = {
|
|
ROLE_ADMIN: [],
|
|
ROLE_OPERATOR: [
|
|
*_model_perms("servers", "server", ["view"]),
|
|
*_model_perms("access", "accessrequest", ["add", "view", "change", "delete"]),
|
|
*_model_perms("keys", "sshkey", ["add", "view", "change", "delete"]),
|
|
*_model_perms("telemetry", "telemetryevent", ["add", "view"]),
|
|
*_model_perms("audit", "auditlog", ["view"]),
|
|
*_model_perms("audit", "auditeventtype", ["view"]),
|
|
*_model_perms("auth", "user", ["add", "view"]),
|
|
],
|
|
ROLE_AUDITOR: [
|
|
*_model_perms("audit", "auditlog", ["view"]),
|
|
*_model_perms("audit", "auditeventtype", ["view"]),
|
|
],
|
|
ROLE_USER: [
|
|
*_model_perms("servers", "server", ["view"]),
|
|
*_model_perms("access", "accessrequest", ["add"]),
|
|
*_model_perms("keys", "sshkey", ["add"]),
|
|
],
|
|
}
|
|
|
|
OBJECT_PERMISSION_MODELS = {
|
|
("servers", "server"),
|
|
("access", "accessrequest"),
|
|
("keys", "sshkey"),
|
|
}
|
|
|
|
|
|
def normalize_role(role: str) -> str:
|
|
normalized = (role or "").strip().lower()
|
|
return ROLE_ALIASES.get(normalized, normalized)
|
|
|
|
|
|
def ensure_role_groups() -> None:
|
|
for role in ROLE_ORDER:
|
|
Group.objects.get_or_create(name=role)
|
|
|
|
|
|
def assign_role_permissions() -> None:
|
|
ensure_role_groups()
|
|
for role, perm_codes in ROLE_PERMISSIONS.items():
|
|
group = Group.objects.get(name=role)
|
|
if role == ROLE_ADMIN:
|
|
group.permissions.set(Permission.objects.all())
|
|
continue
|
|
perms = []
|
|
for code in perm_codes:
|
|
if "." not in code:
|
|
continue
|
|
app_label, codename = code.split(".", 1)
|
|
try:
|
|
perms.append(
|
|
Permission.objects.get(
|
|
content_type__app_label=app_label,
|
|
codename=codename,
|
|
)
|
|
)
|
|
except Permission.DoesNotExist:
|
|
continue
|
|
group.permissions.set(perms)
|
|
|
|
|
|
def assign_default_object_permissions(instance) -> None:
|
|
app_label = instance._meta.app_label
|
|
model_name = instance._meta.model_name
|
|
if (app_label, model_name) not in OBJECT_PERMISSION_MODELS:
|
|
return
|
|
ensure_role_groups()
|
|
groups = {group.name: group for group in Group.objects.filter(name__in=ROLE_ORDER)}
|
|
for role, perm_codes in ROLE_PERMISSIONS.items():
|
|
if role == ROLE_ADMIN:
|
|
continue
|
|
group = groups.get(role)
|
|
if not group:
|
|
continue
|
|
for code in perm_codes:
|
|
if "." not in code:
|
|
continue
|
|
perm_app, codename = code.split(".", 1)
|
|
if perm_app != app_label:
|
|
continue
|
|
if not codename.endswith(f"_{model_name}"):
|
|
continue
|
|
if codename.startswith("add_"):
|
|
continue
|
|
assign_perm(code, group, instance)
|
|
|
|
|
|
def get_user_role(user, default: str = ROLE_USER) -> str | None:
|
|
if not user or not getattr(user, "is_authenticated", False):
|
|
return None
|
|
if getattr(user, "is_superuser", False):
|
|
return ROLE_ADMIN
|
|
group_names = set(user.groups.values_list("name", flat=True))
|
|
for role in ROLE_ORDER:
|
|
if role in group_names:
|
|
return role
|
|
return default
|
|
|
|
|
|
def set_user_role(user, role: str) -> str:
|
|
canonical = normalize_role(role)
|
|
if canonical not in ROLE_ORDER:
|
|
raise ValueError(f"Invalid role: {role}")
|
|
ensure_role_groups()
|
|
role_groups = list(Group.objects.filter(name__in=ROLE_ORDER))
|
|
if role_groups:
|
|
user.groups.remove(*role_groups)
|
|
target_group = Group.objects.get(name=canonical)
|
|
user.groups.add(target_group)
|
|
if canonical == ROLE_ADMIN:
|
|
user.is_staff = True
|
|
user.is_superuser = True
|
|
elif canonical in {ROLE_OPERATOR, ROLE_AUDITOR}:
|
|
user.is_staff = True
|
|
user.is_superuser = False
|
|
else:
|
|
user.is_staff = False
|
|
user.is_superuser = False
|
|
return canonical
|
|
|
|
|
|
def require_authenticated(request) -> None:
|
|
user = getattr(request, "user", None)
|
|
if not user or not getattr(user, "is_authenticated", False):
|
|
raise HttpError(403, "Forbidden")
|
|
|
|
|
|
def require_perms(request, *perms: str) -> None:
|
|
user = getattr(request, "user", None)
|
|
if not user or not getattr(user, "is_authenticated", False):
|
|
raise HttpError(403, "Forbidden")
|
|
if not user.has_perms(perms):
|
|
raise HttpError(403, "Forbidden")
|