Merge new endpoints with development prototype #6

Merged
boris merged 2 commits from api-dev into dev 2026-01-19 18:40:21 +00:00
11 changed files with 451 additions and 165 deletions
Showing only changes of commit a3036f74fc - Show all commits

View File

@@ -0,0 +1,20 @@
from django.conf import settings
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("accounts", "0004_delete_account"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.RunSQL(
sql=(
"CREATE UNIQUE INDEX IF NOT EXISTS auth_user_email_uniq "
"ON auth_user (email);"
),
reverse_sql="DROP INDEX IF EXISTS auth_user_email_uniq;",
),
]

View File

@@ -1,3 +1,2 @@
from .main import api
from .main import api, api_v1

View File

@@ -4,10 +4,19 @@ from ninja import NinjaAPI, Router, Schema
from ninja.security import django_auth
from .security import JWTAuth
from .routers.accounts import router as accounts_router
from .routers.audit import router as audit_router
from .routers.system import router as system_router
from .routers.servers import router as servers_router
from .routers.accounts import build_router as build_accounts_router
from .routers.audit import build_router as build_audit_router
from .routers.system import build_router as build_system_router
from .routers.servers import build_router as build_servers_router
from .routers.users import build_router as build_users_router
def register_routers(target_api: NinjaAPI) -> None:
target_api.add_router("/system", build_system_router(), tags=["system"])
target_api.add_router("/user", build_accounts_router(), tags=["user"])
target_api.add_router("/audit", build_audit_router(), tags=["audit"])
target_api.add_router("/servers", build_servers_router(), tags=["servers"])
target_api.add_router("/users", build_users_router(), tags=["users"])
api = NinjaAPI(
@@ -17,11 +26,14 @@ api = NinjaAPI(
auth=[django_auth, JWTAuth()],
csrf=True, # enforce CSRF for session-authenticated unsafe requests
)
register_routers(api)
# Mount routers
api.add_router("/system", system_router, tags=["system"])
api.add_router("/user", accounts_router, tags=["user"])
api.add_router("/audit", audit_router, tags=["audit"])
api.add_router("/servers", servers_router, tags=["servers"])
api_v1 = NinjaAPI(
title="Keywarden API",
version="1.0.0",
description="Authenticated API for internal app use and external clients.",
auth=[django_auth, JWTAuth()],
csrf=True,
urls_namespace="api-v1",
)
register_routers(api_v1)

View File

@@ -3,8 +3,6 @@ from typing import Optional
from django.http import HttpRequest
from ninja import Router, Schema
router = Router()
class UserSchema(Schema):
id: int
@@ -16,8 +14,11 @@ class UserSchema(Schema):
is_superuser: bool
@router.get("/me", response=UserSchema)
def me(request: HttpRequest):
def build_router() -> Router:
router = Router()
@router.get("/me", response=UserSchema)
def me(request: HttpRequest):
user = request.user
return {
"id": user.id,
@@ -29,4 +30,8 @@ def me(request: HttpRequest):
"is_superuser": bool(user.is_superuser),
}
return router
router = build_router()

View File

@@ -9,9 +9,6 @@ from ninja import Query, Router, Schema
from apps.audit.models import AuditEventType, AuditLog
router = Router()
class AuditEventTypeSchema(Schema):
id: int
key: str
@@ -44,8 +41,11 @@ class LogsQuery(Schema):
source: Optional[str] = None
@router.get("/event-types", response=List[AuditEventTypeSchema])
def list_event_types(request: HttpRequest):
def build_router() -> Router:
router = Router()
@router.get("/event-types", response=List[AuditEventTypeSchema])
def list_event_types(request: HttpRequest):
qs: QuerySet[AuditEventType] = AuditEventType.objects.all()
return [
{
@@ -58,9 +58,8 @@ def list_event_types(request: HttpRequest):
for et in qs
]
@router.get("/logs", response=List[AuditLogSchema])
def list_logs(request: HttpRequest, filters: LogsQuery = Query(...)):
@router.get("/logs", response=List[AuditLogSchema])
def list_logs(request: HttpRequest, filters: LogsQuery = Query(...)):
qs: QuerySet[AuditLog] = AuditLog.objects.select_related("event_type", "actor").all()
if filters.severity:
qs = qs.filter(severity=filters.severity)
@@ -89,4 +88,8 @@ def list_logs(request: HttpRequest, filters: LogsQuery = Query(...)):
for al in qs
]
return router
router = build_router()

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
from typing import List, Optional
from django.db import IntegrityError
from django.http import HttpRequest
from ninja import Router, Schema, File, Form
from ninja import File, Form, Router, Schema
from ninja.files import UploadedFile
from ninja.errors import HttpError
from apps.servers.models import Server
router = Router()
class ServerOut(Schema):
id: int
@@ -27,8 +27,26 @@ class ServerCreate(Schema):
ipv6: Optional[str] = None
@router.get("/", response=List[ServerOut])
def list_servers(request: HttpRequest):
class ServerUpdate(Schema):
display_name: Optional[str] = None
hostname: Optional[str] = None
ipv4: Optional[str] = None
ipv6: Optional[str] = None
def _require_admin(request: HttpRequest) -> None:
user = request.user
if not getattr(user, "is_authenticated", False):
raise HttpError(403, "Forbidden")
if not (user.is_staff or user.is_superuser):
raise HttpError(403, "Forbidden")
def build_router() -> Router:
router = Router()
@router.get("/", response=List[ServerOut])
def list_servers(request: HttpRequest):
servers = Server.objects.all()
return [
{
@@ -43,9 +61,25 @@ def list_servers(request: HttpRequest):
for s in servers
]
@router.get("/{server_id}", response=ServerOut)
def get_server(request: HttpRequest, server_id: int):
try:
server = Server.objects.get(id=server_id)
except Server.DoesNotExist:
raise HttpError(404, "Not Found")
return {
"id": server.id,
"display_name": server.display_name,
"hostname": server.hostname,
"ipv4": server.ipv4,
"ipv6": server.ipv6,
"image_url": server.image_url,
"initial": server.initial,
}
@router.post("/", response=ServerOut)
def create_server_json(request: HttpRequest, payload: ServerCreate):
@router.post("/", response=ServerOut)
def create_server_json(request: HttpRequest, payload: ServerCreate):
_require_admin(request)
server = Server.objects.create(
display_name=payload.display_name.strip(),
hostname=(payload.hostname or "").strip() or None,
@@ -62,16 +96,16 @@ def create_server_json(request: HttpRequest, payload: ServerCreate):
"initial": server.initial,
}
@router.post("/upload", response=ServerOut)
def create_server_multipart(
@router.post("/upload", response=ServerOut)
def create_server_multipart(
request: HttpRequest,
display_name: str = Form(...),
hostname: Optional[str] = Form(None),
ipv4: Optional[str] = Form(None),
ipv6: Optional[str] = Form(None),
image: Optional[UploadedFile] = File(None),
):
):
_require_admin(request)
server = Server(
display_name=display_name.strip(),
hostname=(hostname or "").strip() or None,
@@ -91,4 +125,56 @@ def create_server_multipart(
"initial": server.initial,
}
@router.patch("/{server_id}", response=ServerOut)
def update_server(request: HttpRequest, server_id: int, payload: ServerUpdate):
_require_admin(request)
if (
payload.display_name is None
and payload.hostname is None
and payload.ipv4 is None
and payload.ipv6 is None
):
raise HttpError(422, {"detail": "No fields provided."})
try:
server = Server.objects.get(id=server_id)
except Server.DoesNotExist:
raise HttpError(404, "Not Found")
if payload.display_name is not None:
display_name = payload.display_name.strip()
if not display_name:
raise HttpError(422, {"display_name": ["Display name cannot be empty."]})
server.display_name = display_name
if payload.hostname is not None:
server.hostname = (payload.hostname or "").strip() or None
if payload.ipv4 is not None:
server.ipv4 = (payload.ipv4 or "").strip() or None
if payload.ipv6 is not None:
server.ipv6 = (payload.ipv6 or "").strip() or None
try:
server.save()
except IntegrityError:
raise HttpError(422, {"detail": "Unique constraint violated."})
return {
"id": server.id,
"display_name": server.display_name,
"hostname": server.hostname,
"ipv4": server.ipv4,
"ipv6": server.ipv6,
"image_url": server.image_url,
"initial": server.initial,
}
@router.delete("/{server_id}", response={204: None})
def delete_server(request: HttpRequest, server_id: int):
_require_admin(request)
try:
server = Server.objects.get(id=server_id)
except Server.DoesNotExist:
raise HttpError(404, "Not Found")
server.delete()
return 204, None
return router
router = build_router()

View File

@@ -2,15 +2,20 @@ from typing import Literal, TypedDict
from ninja import Router
router = Router()
class HealthResponse(TypedDict):
status: Literal["ok"]
@router.get("/health", response=HealthResponse)
def health() -> HealthResponse:
def build_router() -> Router:
router = Router()
@router.get("/health", response=HealthResponse)
def health() -> HealthResponse:
return {"status": "ok"}
return router
router = build_router()

View File

@@ -0,0 +1,164 @@
from __future__ import annotations
from typing import List, Literal
from django.contrib.auth import get_user_model
from django.db import IntegrityError
from django.http import HttpRequest
from ninja import Query, Router, Schema
from ninja.errors import HttpError
from pydantic import EmailStr, Field
class UserCreateIn(Schema):
email: EmailStr
password: str = Field(min_length=8)
role: Literal["admin", "user"]
class UserListOut(Schema):
id: int
email: str
role: str
is_active: bool
class UserDetailOut(Schema):
id: int
email: str
role: str
is_active: bool
class UserUpdateIn(Schema):
email: EmailStr | None = None
password: str | None = Field(default=None, min_length=8)
role: Literal["admin", "user"] | None = None
is_active: bool | None = None
class UsersQuery(Schema):
limit: int = Field(default=50, ge=1, le=200)
offset: int = Field(default=0, ge=0)
def _require_admin(request: HttpRequest) -> None:
user = request.user
if not getattr(user, "is_authenticated", False):
raise HttpError(403, "Forbidden")
if not (user.is_staff or user.is_superuser):
raise HttpError(403, "Forbidden")
def _role_from_user(user) -> str:
return "admin" if (user.is_staff or user.is_superuser) else "user"
def _apply_role(user, role: str) -> None:
if role == "admin":
user.is_staff = True
user.is_superuser = True
else:
user.is_staff = False
user.is_superuser = False
def build_router() -> Router:
router = Router()
@router.post("/", response=UserDetailOut)
def create_user(request: HttpRequest, payload: UserCreateIn):
_require_admin(request)
User = get_user_model()
email = payload.email.strip().lower()
if User.objects.filter(email__iexact=email).exists():
raise HttpError(422, {"email": ["Email already exists."]})
user = User(username=email, email=email, is_active=True)
_apply_role(user, payload.role)
user.set_password(payload.password)
try:
user.save()
except IntegrityError:
raise HttpError(422, {"email": ["Email already exists."]})
return {
"id": user.id,
"email": user.email,
"role": _role_from_user(user),
"is_active": user.is_active,
}
@router.get("/", response=List[UserListOut])
def list_users(request: HttpRequest, pagination: UsersQuery = Query(...)):
_require_admin(request)
User = get_user_model()
qs = User.objects.order_by("id")[pagination.offset : pagination.offset + pagination.limit]
return [
{
"id": user.id,
"email": user.email or "",
"role": _role_from_user(user),
"is_active": user.is_active,
}
for user in qs
]
@router.get("/{user_id}", response=UserDetailOut)
def get_user(request: HttpRequest, user_id: int):
_require_admin(request)
User = get_user_model()
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
raise HttpError(404, "Not Found")
return {
"id": user.id,
"email": user.email or "",
"role": _role_from_user(user),
"is_active": user.is_active,
}
@router.patch("/{user_id}", response=UserDetailOut)
def update_user(request: HttpRequest, user_id: int, payload: UserUpdateIn):
_require_admin(request)
if payload.email is None and payload.password is None and payload.role is None and payload.is_active is None:
raise HttpError(422, {"detail": "No fields provided."})
User = get_user_model()
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
raise HttpError(404, "Not Found")
if payload.email is not None:
email = payload.email.strip().lower()
if User.objects.filter(email__iexact=email).exclude(id=user_id).exists():
raise HttpError(422, {"email": ["Email already exists."]})
user.email = email
user.username = email
if payload.password is not None:
user.set_password(payload.password)
if payload.role is not None:
_apply_role(user, payload.role)
if payload.is_active is not None:
user.is_active = payload.is_active
user.save()
return {
"id": user.id,
"email": user.email or "",
"role": _role_from_user(user),
"is_active": user.is_active,
}
@router.delete("/{user_id}", response={204: None})
def delete_user(request: HttpRequest, user_id: int):
_require_admin(request)
User = get_user_model()
try:
user = User.objects.get(id=user_id)
except User.DoesNotExist:
raise HttpError(404, "Not Found")
user.delete()
return 204, None
return router
router = build_router()

View File

@@ -86,6 +86,14 @@ CACHES = {
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"
PASSWORD_HASHERS = [
"django.contrib.auth.hashers.Argon2PasswordHasher",
"django.contrib.auth.hashers.PBKDF2PasswordHasher",
"django.contrib.auth.hashers.PBKDF2SHA1PasswordHasher",
"django.contrib.auth.hashers.BCryptSHA256PasswordHasher",
"django.contrib.auth.hashers.ScryptPasswordHasher",
]
STATIC_URL = "/static/"
STATIC_ROOT = BASE_DIR/"static"
STATICFILES_STORAGE = "whitenoise.storage.CompressedManifestStaticFilesStorage"
@@ -144,25 +152,6 @@ UNFOLD = {
"SCRIPTS": [
"/static/unfold/js/simplebar.js",
],
"SITE_DROPDOWN": [
{
"icon": "diamond",
"title": _("Keywarden Development"),
"link": "https://keywarden.dev.ntbx.io",
"attrs": {
"target": "_blank",
},
},
{
"icon": "diamond",
"title": _("Keywarden [Inactive]"),
"link": "https://keywarden.ntbx.io",
"attrs": {
"target": "_blank",
},
},
],
"TABS": [
{
"models": [

View File

@@ -2,7 +2,7 @@ from django.contrib import admin
from django.urls import path, include
from django.views.generic import RedirectView
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
from keywarden.api import api as ninja_api
from keywarden.api import api as ninja_api, api_v1 as ninja_api_v1
urlpatterns = [
path("admin/", admin.site.urls),
@@ -10,6 +10,7 @@ urlpatterns = [
path("accounts/", include("apps.accounts.urls")),
# API
path("api/", ninja_api.urls),
path("api/v1/", ninja_api_v1.urls),
path("api/auth/jwt/create/", TokenObtainPairView.as_view(), name="jwt-create"),
path("api/auth/jwt/refresh/", TokenRefreshView.as_view(), name="jwt-refresh"),
path("", RedirectView.as_view(pattern_name="accounts:login", permanent=False)),

View File

@@ -7,6 +7,7 @@ Pillow>=10.0.0
mozilla-django-oidc>=4.0.0
django-unfold>=0.70.0
django-tailwind==4.4.0
argon2-cffi>=23.1.0
psycopg2-binary>=2.9.9
gunicorn==23.0.0
paramiko==4.0.0
@@ -16,3 +17,4 @@ python-dotenv>=1.2
whitenoise>=6.6
cookiecutter>=2.6
distlib>=0.3.8
email-validator>=2.1.0