Attempting to resolve unfold form inconsistencies.
This commit is contained in:
@@ -1,11 +1,22 @@
|
||||
from django.contrib import admin
|
||||
from guardian.admin import GuardedModelAdmin
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.html import format_html
|
||||
try:
|
||||
from unfold.contrib.guardian.admin import GuardedModelAdmin
|
||||
except ImportError: # Fallback for older Unfold builds without guardian admin shim.
|
||||
from guardian.admin import GuardedModelAdmin as GuardianGuardedModelAdmin
|
||||
from unfold.admin import ModelAdmin as UnfoldModelAdmin
|
||||
|
||||
class GuardedModelAdmin(GuardianGuardedModelAdmin, UnfoldModelAdmin):
|
||||
pass
|
||||
|
||||
from .models import AccessRequest
|
||||
|
||||
|
||||
@admin.register(AccessRequest)
|
||||
class AccessRequestAdmin(GuardedModelAdmin):
|
||||
autocomplete_fields = ("requester", "server", "decided_by")
|
||||
list_display = (
|
||||
"id",
|
||||
"requester",
|
||||
@@ -14,7 +25,75 @@ class AccessRequestAdmin(GuardedModelAdmin):
|
||||
"requested_at",
|
||||
"expires_at",
|
||||
"decided_by",
|
||||
"delete_link",
|
||||
)
|
||||
list_filter = ("status", "server")
|
||||
search_fields = ("requester__username", "requester__email", "server__display_name")
|
||||
ordering = ("-requested_at",)
|
||||
compressed_fields = True
|
||||
actions_on_top = True
|
||||
actions_on_bottom = True
|
||||
def get_readonly_fields(self, request, obj=None):
|
||||
readonly = ["requested_at"]
|
||||
if obj:
|
||||
readonly.extend(["decided_at", "decided_by"])
|
||||
return readonly
|
||||
|
||||
def get_fieldsets(self, request, obj=None):
|
||||
if obj is None:
|
||||
return (
|
||||
(
|
||||
"Request",
|
||||
{
|
||||
"fields": (
|
||||
"requester",
|
||||
"server",
|
||||
"status",
|
||||
"reason",
|
||||
"expires_at",
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
return (
|
||||
(
|
||||
"Request",
|
||||
{
|
||||
"fields": (
|
||||
"requester",
|
||||
"server",
|
||||
"status",
|
||||
"reason",
|
||||
"expires_at",
|
||||
)
|
||||
},
|
||||
),
|
||||
(
|
||||
"Decision",
|
||||
{
|
||||
"fields": (
|
||||
"decided_at",
|
||||
"decided_by",
|
||||
)
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
def save_model(self, request, obj, form, change) -> None:
|
||||
if obj.status in {
|
||||
AccessRequest.Status.APPROVED,
|
||||
AccessRequest.Status.DENIED,
|
||||
AccessRequest.Status.REVOKED,
|
||||
AccessRequest.Status.CANCELLED,
|
||||
}:
|
||||
if not obj.decided_at:
|
||||
obj.decided_at = timezone.now()
|
||||
if not obj.decided_by_id and request.user and request.user.is_authenticated:
|
||||
obj.decided_by = request.user
|
||||
super().save_model(request, obj, form, change)
|
||||
|
||||
def delete_link(self, obj: AccessRequest):
|
||||
url = reverse("admin:access_accessrequest_delete", args=[obj.pk])
|
||||
return format_html('<a class="text-red-600" href="{}">Delete</a>', url)
|
||||
|
||||
delete_link.short_description = "Delete"
|
||||
|
||||
26
app/apps/access/permissions.py
Normal file
26
app/apps/access/permissions.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from django.db.models import Q
|
||||
from django.utils import timezone
|
||||
from guardian.shortcuts import assign_perm, remove_perm
|
||||
|
||||
from .models import AccessRequest
|
||||
|
||||
|
||||
def sync_server_view_perm(access_request: AccessRequest) -> None:
|
||||
if not access_request or not access_request.requester_id or not access_request.server_id:
|
||||
return
|
||||
now = timezone.now()
|
||||
has_valid_access = (
|
||||
AccessRequest.objects.filter(
|
||||
requester_id=access_request.requester_id,
|
||||
server_id=access_request.server_id,
|
||||
status=AccessRequest.Status.APPROVED,
|
||||
)
|
||||
.filter(Q(expires_at__isnull=True) | Q(expires_at__gt=now))
|
||||
.exists()
|
||||
)
|
||||
if has_valid_access:
|
||||
assign_perm("servers.view_server", access_request.requester, access_request.server)
|
||||
return
|
||||
remove_perm("servers.view_server", access_request.requester, access_request.server)
|
||||
@@ -6,11 +6,13 @@ from guardian.shortcuts import assign_perm
|
||||
|
||||
from apps.core.rbac import assign_default_object_permissions
|
||||
from .models import AccessRequest
|
||||
from .permissions import sync_server_view_perm
|
||||
|
||||
|
||||
@receiver(post_save, sender=AccessRequest)
|
||||
def assign_access_request_perms(sender, instance: AccessRequest, created: bool, **kwargs) -> None:
|
||||
if not created:
|
||||
sync_server_view_perm(instance)
|
||||
return
|
||||
if instance.requester_id:
|
||||
user = instance.requester
|
||||
@@ -21,3 +23,4 @@ def assign_access_request_perms(sender, instance: AccessRequest, created: bool,
|
||||
):
|
||||
assign_perm(perm, user, instance)
|
||||
assign_default_object_permissions(instance)
|
||||
sync_server_view_perm(instance)
|
||||
|
||||
27
app/apps/access/tasks.py
Normal file
27
app/apps/access/tasks.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from celery import shared_task
|
||||
from django.db import transaction
|
||||
from django.utils import timezone
|
||||
from .models import AccessRequest
|
||||
from .permissions import sync_server_view_perm
|
||||
|
||||
|
||||
@shared_task
|
||||
def expire_access_requests() -> int:
|
||||
now = timezone.now()
|
||||
expired_qs = AccessRequest.objects.select_related("server", "requester").filter(
|
||||
status=AccessRequest.Status.APPROVED,
|
||||
expires_at__isnull=False,
|
||||
expires_at__lte=now,
|
||||
)
|
||||
count = 0
|
||||
for access_request in expired_qs:
|
||||
with transaction.atomic():
|
||||
access_request.status = AccessRequest.Status.EXPIRED
|
||||
access_request.decided_at = now
|
||||
access_request.decided_by = None
|
||||
access_request.save(update_fields=["status", "decided_at", "decided_by"])
|
||||
sync_server_view_perm(access_request)
|
||||
count += 1
|
||||
return count
|
||||
@@ -1,5 +1,12 @@
|
||||
from django.contrib import admin
|
||||
from guardian.admin import GuardedModelAdmin
|
||||
try:
|
||||
from unfold.contrib.guardian.admin import GuardedModelAdmin
|
||||
except ImportError: # Fallback for older Unfold builds without guardian admin shim.
|
||||
from guardian.admin import GuardedModelAdmin as GuardianGuardedModelAdmin
|
||||
from unfold.admin import ModelAdmin as UnfoldModelAdmin
|
||||
|
||||
class GuardedModelAdmin(GuardianGuardedModelAdmin, UnfoldModelAdmin):
|
||||
pass
|
||||
|
||||
from .models import SSHKey
|
||||
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
from django.contrib import admin
|
||||
from django.utils.html import format_html
|
||||
from guardian.admin import GuardedModelAdmin
|
||||
try:
|
||||
from unfold.contrib.guardian.admin import GuardedModelAdmin
|
||||
except ImportError: # Fallback for older Unfold builds without guardian admin shim.
|
||||
from guardian.admin import GuardedModelAdmin as GuardianGuardedModelAdmin
|
||||
from unfold.admin import ModelAdmin as UnfoldModelAdmin
|
||||
|
||||
class GuardedModelAdmin(GuardianGuardedModelAdmin, UnfoldModelAdmin):
|
||||
pass
|
||||
|
||||
from .models import AgentCertificateAuthority, EnrollmentToken, Server
|
||||
|
||||
|
||||
@@ -5,13 +5,25 @@ from django.db.models import Q
|
||||
from django.http import Http404
|
||||
from django.shortcuts import render
|
||||
from django.utils import timezone
|
||||
from guardian.shortcuts import get_objects_for_user
|
||||
|
||||
from apps.access.models import AccessRequest
|
||||
from apps.servers.models import Server
|
||||
|
||||
|
||||
@login_required(login_url="/accounts/login/")
|
||||
def dashboard(request):
|
||||
now = timezone.now()
|
||||
if request.user.has_perm("servers.view_server"):
|
||||
server_qs = Server.objects.all()
|
||||
else:
|
||||
server_qs = get_objects_for_user(
|
||||
request.user,
|
||||
"servers.view_server",
|
||||
klass=Server,
|
||||
accept_global_perms=False,
|
||||
)
|
||||
|
||||
access_qs = (
|
||||
AccessRequest.objects.select_related("server")
|
||||
.filter(
|
||||
@@ -19,22 +31,24 @@ def dashboard(request):
|
||||
status=AccessRequest.Status.APPROVED,
|
||||
)
|
||||
.filter(Q(expires_at__isnull=True) | Q(expires_at__gt=now))
|
||||
.order_by("-requested_at")
|
||||
)
|
||||
|
||||
seen = set()
|
||||
servers = []
|
||||
expires_map = {}
|
||||
for access in access_qs:
|
||||
if access.server_id in seen:
|
||||
continue
|
||||
seen.add(access.server_id)
|
||||
servers.append(
|
||||
{
|
||||
"server": access.server,
|
||||
"expires_at": access.expires_at,
|
||||
"last_accessed": None,
|
||||
}
|
||||
)
|
||||
expires_at = access.expires_at
|
||||
current = expires_map.get(access.server_id)
|
||||
if current is None or expires_at is None:
|
||||
expires_map[access.server_id] = None
|
||||
elif current and expires_at and expires_at > current:
|
||||
expires_map[access.server_id] = expires_at
|
||||
|
||||
servers = [
|
||||
{
|
||||
"server": server,
|
||||
"expires_at": expires_map.get(server.id),
|
||||
"last_accessed": None,
|
||||
}
|
||||
for server in server_qs
|
||||
]
|
||||
|
||||
context = {
|
||||
"servers": servers,
|
||||
@@ -45,9 +59,17 @@ def dashboard(request):
|
||||
@login_required(login_url="/accounts/login/")
|
||||
def detail(request, server_id: int):
|
||||
now = timezone.now()
|
||||
try:
|
||||
server = Server.objects.get(id=server_id)
|
||||
except Server.DoesNotExist:
|
||||
raise Http404("Server not found")
|
||||
if not request.user.has_perm("servers.view_server", server) and not request.user.has_perm(
|
||||
"servers.view_server"
|
||||
):
|
||||
raise Http404("Server not found")
|
||||
|
||||
access = (
|
||||
AccessRequest.objects.select_related("server")
|
||||
.filter(
|
||||
AccessRequest.objects.filter(
|
||||
requester=request.user,
|
||||
server_id=server_id,
|
||||
status=AccessRequest.Status.APPROVED,
|
||||
@@ -56,12 +78,10 @@ def detail(request, server_id: int):
|
||||
.order_by("-requested_at")
|
||||
.first()
|
||||
)
|
||||
if not access:
|
||||
raise Http404("Server not found")
|
||||
|
||||
context = {
|
||||
"server": access.server,
|
||||
"expires_at": access.expires_at,
|
||||
"server": server,
|
||||
"expires_at": access.expires_at if access else None,
|
||||
"last_accessed": None,
|
||||
}
|
||||
return render(request, "servers/detail.html", context)
|
||||
|
||||
Reference in New Issue
Block a user