Ephemeral keys for xterm.js. Initial rework of audit logging. All endpoints now return a 401 regardless of presence if not logged in.
This commit is contained in:
0
app/apps/audit/tests/__init__.py
Normal file
0
app/apps/audit/tests/__init__.py
Normal file
86
app/apps/audit/tests/test_api_audit_middleware.py
Normal file
86
app/apps/audit/tests/test_api_audit_middleware.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory, TestCase
|
||||
|
||||
from apps.audit.matching import find_matching_event_type
|
||||
from apps.audit.middleware import ApiAuditLogMiddleware
|
||||
from apps.audit.models import AuditEventType, AuditLog
|
||||
|
||||
|
||||
class ApiAuditMiddlewareTests(TestCase):
|
||||
def setUp(self) -> None:
|
||||
super().setUp()
|
||||
self.factory = RequestFactory()
|
||||
self.middleware = ApiAuditLogMiddleware(lambda request: HttpResponse("ok"))
|
||||
|
||||
def _call(self, method: str, path: str, ip: str = "203.0.113.5") -> None:
|
||||
request = self.factory.generic(method, path)
|
||||
request.META["REMOTE_ADDR"] = ip
|
||||
self.middleware(request)
|
||||
|
||||
def test_no_matching_event_type_creates_no_logs_or_event_types(self) -> None:
|
||||
self._call("GET", "/api/auto/")
|
||||
self.assertEqual(AuditEventType.objects.count(), 0)
|
||||
self.assertEqual(AuditLog.objects.count(), 0)
|
||||
|
||||
def test_matching_event_type_creates_log(self) -> None:
|
||||
event_type = AuditEventType.objects.create(
|
||||
key="api_test",
|
||||
title="API test",
|
||||
kind=AuditEventType.Kind.API,
|
||||
endpoints=["/api/test/"],
|
||||
)
|
||||
self._call("GET", "/api/test/")
|
||||
log = AuditLog.objects.get()
|
||||
self.assertEqual(log.event_type_id, event_type.id)
|
||||
self.assertEqual(log.source, AuditLog.Source.API)
|
||||
self.assertEqual(log.severity, event_type.default_severity)
|
||||
|
||||
def test_ip_whitelist_blocks_and_allows(self) -> None:
|
||||
AuditEventType.objects.create(
|
||||
key="api_whitelist",
|
||||
title="API whitelist",
|
||||
kind=AuditEventType.Kind.API,
|
||||
endpoints=["/api/whitelist/"],
|
||||
ip_whitelist_enabled=True,
|
||||
ip_whitelist=["203.0.113.10"],
|
||||
)
|
||||
|
||||
self._call("GET", "/api/whitelist/", ip="203.0.113.5")
|
||||
self.assertEqual(AuditLog.objects.count(), 0)
|
||||
|
||||
self._call("GET", "/api/whitelist/", ip="203.0.113.10")
|
||||
self.assertEqual(AuditLog.objects.count(), 1)
|
||||
|
||||
def test_ip_blacklist_blocks(self) -> None:
|
||||
AuditEventType.objects.create(
|
||||
key="api_blacklist",
|
||||
title="API blacklist",
|
||||
kind=AuditEventType.Kind.API,
|
||||
endpoints=["/api/blacklist/"],
|
||||
ip_blacklist_enabled=True,
|
||||
ip_blacklist=["203.0.113.5"],
|
||||
)
|
||||
|
||||
self._call("GET", "/api/blacklist/", ip="203.0.113.5")
|
||||
self.assertEqual(AuditLog.objects.count(), 0)
|
||||
|
||||
|
||||
class AuditEventMatchingTests(TestCase):
|
||||
def test_websocket_event_type_can_match(self) -> None:
|
||||
event_type = AuditEventType.objects.create(
|
||||
key="ws_shell",
|
||||
title="WebSocket shell",
|
||||
kind=AuditEventType.Kind.WEBSOCKET,
|
||||
endpoints=["/ws/servers/*/shell/"],
|
||||
)
|
||||
matched = find_matching_event_type(
|
||||
kind=AuditEventType.Kind.WEBSOCKET,
|
||||
method="GET",
|
||||
route="/ws/servers/123/shell/",
|
||||
path="/ws/servers/123/shell/",
|
||||
ip="203.0.113.10",
|
||||
)
|
||||
self.assertIsNotNone(matched)
|
||||
self.assertEqual(matched.id, event_type.id)
|
||||
Reference in New Issue
Block a user