171 lines
6.2 KiB
Python
171 lines
6.2 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import binascii
|
|
import hashlib
|
|
import os
|
|
import subprocess
|
|
import tempfile
|
|
|
|
from django.conf import settings
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import models
|
|
from django.utils import timezone
|
|
|
|
|
|
def parse_public_key(public_key: str) -> tuple[str, str, str]:
|
|
trimmed = (public_key or "").strip()
|
|
parts = trimmed.split()
|
|
if len(parts) < 2:
|
|
raise ValidationError("Invalid SSH public key format.")
|
|
key_type, key_b64 = parts[0], parts[1]
|
|
try:
|
|
key_bytes = base64.b64decode(key_b64.encode("ascii"), validate=True)
|
|
except (binascii.Error, ValueError) as exc:
|
|
raise ValidationError("Invalid SSH public key format.") from exc
|
|
digest = hashlib.sha256(key_bytes).digest()
|
|
fingerprint = "SHA256:" + base64.b64encode(digest).decode("ascii").rstrip("=")
|
|
return key_type, key_b64, fingerprint
|
|
|
|
|
|
class SSHKey(models.Model):
|
|
user = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="ssh_keys"
|
|
)
|
|
name = models.CharField(max_length=128)
|
|
public_key = models.TextField()
|
|
key_type = models.CharField(max_length=32)
|
|
fingerprint = models.CharField(max_length=128, db_index=True)
|
|
is_active = models.BooleanField(default=True, db_index=True)
|
|
created_at = models.DateTimeField(default=timezone.now, editable=False)
|
|
revoked_at = models.DateTimeField(null=True, blank=True)
|
|
last_used_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
verbose_name = "SSH key"
|
|
verbose_name_plural = "SSH keys"
|
|
indexes = [
|
|
models.Index(fields=["user", "is_active"], name="keys_user_active_idx"),
|
|
models.Index(fields=["fingerprint"], name="keys_fingerprint_idx"),
|
|
]
|
|
constraints = [
|
|
models.UniqueConstraint(
|
|
fields=["user", "fingerprint"], name="unique_user_key_fingerprint"
|
|
)
|
|
]
|
|
ordering = ["-created_at"]
|
|
|
|
def set_public_key(self, public_key: str) -> None:
|
|
key_type, key_b64, fingerprint = parse_public_key(public_key)
|
|
self.key_type = key_type
|
|
self.fingerprint = fingerprint
|
|
self.public_key = f"{key_type} {key_b64}"
|
|
|
|
def revoke(self) -> None:
|
|
self.is_active = False
|
|
self.revoked_at = timezone.now()
|
|
try:
|
|
cert = self.certificate
|
|
except SSHCertificate.DoesNotExist:
|
|
return
|
|
cert.revoke()
|
|
cert.save(update_fields=["is_active", "revoked_at"])
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.name} ({self.user_id})"
|
|
|
|
|
|
class SSHCertificateAuthority(models.Model):
|
|
name = models.CharField(max_length=128, default="Keywarden User SSH CA")
|
|
public_key = models.TextField(blank=True)
|
|
private_key = models.TextField(blank=True)
|
|
fingerprint = models.CharField(max_length=128, blank=True)
|
|
created_at = models.DateTimeField(default=timezone.now, editable=False)
|
|
revoked_at = models.DateTimeField(null=True, blank=True)
|
|
is_active = models.BooleanField(default=True, db_index=True)
|
|
created_by = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL,
|
|
null=True,
|
|
blank=True,
|
|
on_delete=models.SET_NULL,
|
|
related_name="ssh_certificate_authorities",
|
|
)
|
|
|
|
class Meta:
|
|
verbose_name = "SSH certificate authority"
|
|
verbose_name_plural = "SSH certificate authorities"
|
|
ordering = ["-created_at"]
|
|
|
|
def __str__(self) -> str:
|
|
status = "active" if self.is_active and not self.revoked_at else "revoked"
|
|
return f"{self.name} ({status})"
|
|
|
|
def revoke(self) -> None:
|
|
self.is_active = False
|
|
self.revoked_at = timezone.now()
|
|
|
|
def ensure_material(self) -> None:
|
|
if self.public_key and self.private_key:
|
|
if not self.fingerprint:
|
|
_, _, fingerprint = parse_public_key(self.public_key)
|
|
self.fingerprint = fingerprint
|
|
return
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
key_path = os.path.join(tmpdir, "keywarden_user_ca")
|
|
cmd = [
|
|
"ssh-keygen",
|
|
"-t",
|
|
"ed25519",
|
|
"-f",
|
|
key_path,
|
|
"-C",
|
|
self.name,
|
|
"-N",
|
|
"",
|
|
]
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True)
|
|
except FileNotFoundError as exc:
|
|
raise RuntimeError("ssh-keygen not available") from exc
|
|
except subprocess.CalledProcessError as exc:
|
|
raise RuntimeError(f"ssh-keygen failed: {exc.stderr.decode('utf-8', 'ignore')}") from exc
|
|
with open(key_path, "r", encoding="utf-8") as handle:
|
|
self.private_key = handle.read()
|
|
with open(key_path + ".pub", "r", encoding="utf-8") as handle:
|
|
self.public_key = handle.read().strip()
|
|
_, _, fingerprint = parse_public_key(self.public_key)
|
|
self.fingerprint = fingerprint
|
|
|
|
|
|
class SSHCertificate(models.Model):
|
|
key = models.OneToOneField(
|
|
SSHKey, on_delete=models.CASCADE, related_name="certificate"
|
|
)
|
|
user = models.ForeignKey(
|
|
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="ssh_certificates"
|
|
)
|
|
certificate = models.TextField()
|
|
serial = models.BigIntegerField()
|
|
principals = models.JSONField(default=list, blank=True)
|
|
valid_after = models.DateTimeField()
|
|
valid_before = models.DateTimeField()
|
|
created_at = models.DateTimeField(default=timezone.now, editable=False)
|
|
revoked_at = models.DateTimeField(null=True, blank=True)
|
|
is_active = models.BooleanField(default=True, db_index=True)
|
|
|
|
class Meta:
|
|
verbose_name = "SSH certificate"
|
|
verbose_name_plural = "SSH certificates"
|
|
indexes = [
|
|
models.Index(fields=["user", "is_active"], name="keys_cert_user_active_idx"),
|
|
models.Index(fields=["valid_before"], name="keys_cert_valid_before_idx"),
|
|
]
|
|
ordering = ["-created_at"]
|
|
|
|
def revoke(self) -> None:
|
|
self.is_active = False
|
|
self.revoked_at = timezone.now()
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.user_id} ({self.serial})"
|