From 962ba276798f37c8cfa8cd4ff6f28d395d2311d2 Mon Sep 17 00:00:00 2001 From: boris Date: Tue, 3 Feb 2026 09:33:49 +0000 Subject: [PATCH] Commented terminal files --- app/apps/keys/certificates.py | 10 ++++++++++ app/apps/keys/utils.py | 4 ++++ app/apps/servers/consumers.py | 11 +++++++++++ app/keywarden/settings/base.py | 7 +++++++ 4 files changed, 32 insertions(+) diff --git a/app/apps/keys/certificates.py b/app/apps/keys/certificates.py index cb7e957..484b33e 100644 --- a/app/apps/keys/certificates.py +++ b/app/apps/keys/certificates.py @@ -15,6 +15,7 @@ from .utils import render_system_username def get_active_ca(created_by=None) -> SSHCertificateAuthority: + # Reuse the most recent active CA, or lazily create one if missing. ca = ( SSHCertificateAuthority.objects.filter(is_active=True, revoked_at__isnull=True) .order_by("-created_at") @@ -31,9 +32,11 @@ def issue_certificate_for_key(key: SSHKey, created_by=None) -> SSHCertificate: if not key or not key.user_id: raise ValueError("key must have a user") ca = get_active_ca(created_by=created_by) + # Principal must match the system account used for SSH logins. principal = render_system_username(key.user.username, key.user_id) now = timezone.now() valid_before = now + timedelta(days=settings.KEYWARDEN_USER_CERT_VALIDITY_DAYS) + # Serial should be unique and non-guessable for audit purposes. serial = secrets.randbits(63) safe_name = _sanitize_label(key.name or "key") identity = f"keywarden-cert-{key.user_id}-{safe_name}-{key.id}" @@ -70,6 +73,7 @@ def revoke_certificate_for_key(key: SSHKey) -> None: cert = key.certificate except SSHCertificate.DoesNotExist: return + # Mark the cert as revoked but keep the record for audit/history. cert.revoke() cert.save(update_fields=["is_active", "revoked_at"]) @@ -87,6 +91,7 @@ def _sign_public_key( ) -> str: if not ca_private_key or not ca_public_key: raise RuntimeError("CA material missing") + # Write key material into a temp dir to avoid persisting secrets. with tempfile.TemporaryDirectory() as tmpdir: ca_path = os.path.join(tmpdir, "user_ca") pubkey_path = os.path.join(tmpdir, "user.pub") @@ -94,6 +99,7 @@ def _sign_public_key( _write_file(ca_path + ".pub", ca_public_key.strip() + "\n", 0o644) pubkey_with_comment = _ensure_comment(public_key, comment) _write_file(pubkey_path, pubkey_with_comment + "\n", 0o644) + # Use ssh-keygen to sign the public key with the CA. cmd = [ "ssh-keygen", "-s", @@ -114,6 +120,7 @@ def _sign_public_key( raise RuntimeError("ssh-keygen not available") from exc except subprocess.CalledProcessError as exc: raise RuntimeError(f"ssh-keygen failed: {exc.stderr.decode('utf-8', 'ignore')}") from exc + # ssh-keygen writes the cert alongside the input pubkey. cert_path = pubkey_path if cert_path.endswith(".pub"): cert_path = cert_path[: -len(".pub")] @@ -126,6 +133,7 @@ def _sign_public_key( def _ensure_comment(public_key: str, comment: str) -> str: + # Preserve the key type and base64 payload; replace/append only the comment. parts = (public_key or "").strip().split() if len(parts) < 2: return public_key.strip() @@ -136,6 +144,7 @@ def _ensure_comment(public_key: str, comment: str) -> str: def _sanitize_label(value: str) -> str: + # Reduce label to a safe, lowercase token for certificate identity. cleaned = re.sub(r"[^a-zA-Z0-9_-]+", "-", (value or "").strip()) cleaned = cleaned.strip("-_") if cleaned: @@ -146,4 +155,5 @@ def _sanitize_label(value: str) -> str: def _write_file(path: str, data: str, mode: int) -> None: with open(path, "w", encoding="utf-8") as handle: handle.write(data) + # Apply explicit permissions for key material. os.chmod(path, mode) diff --git a/app/apps/keys/utils.py b/app/apps/keys/utils.py index 7f07546..ebf8dcb 100644 --- a/app/apps/keys/utils.py +++ b/app/apps/keys/utils.py @@ -9,6 +9,7 @@ _SANITIZE_RE = re.compile(r"[^a-z0-9_-]") def render_system_username(username: str, user_id: int) -> str: + # Render from template and then sanitize to an OS-safe username. template = settings.KEYWARDEN_ACCOUNT_USERNAME_TEMPLATE raw = template.replace("{{username}}", username or "") raw = raw.replace("{{user_id}}", str(user_id)) @@ -17,13 +18,16 @@ def render_system_username(username: str, user_id: int) -> str: cleaned = cleaned[:MAX_USERNAME_LEN] if cleaned: return cleaned + # Fall back to a deterministic, non-empty username. return f"kw_{user_id}" def sanitize_username(raw: str) -> str: + # Normalize to lowercase and replace disallowed characters. raw = (raw or "").lower() raw = _SANITIZE_RE.sub("_", raw) raw = raw.strip("-_") if raw.startswith("-"): + # Avoid leading dash, which can be interpreted as a CLI flag. return "kw" + raw return raw diff --git a/app/apps/servers/consumers.py b/app/apps/servers/consumers.py index bf3b2d1..727d160 100644 --- a/app/apps/servers/consumers.py +++ b/app/apps/servers/consumers.py @@ -35,6 +35,7 @@ class ShellConsumer(AsyncWebsocketConsumer): self.shell_target = "" self.server_id: int | None = None + # Reject unauthenticated connections before any side effects. user = self.scope.get("user") if not user or not getattr(user, "is_authenticated", False): await self.close(code=4401) @@ -54,6 +55,7 @@ class ShellConsumer(AsyncWebsocketConsumer): if not can_shell: await self.close(code=4403) return + # Resolve the per-user system account name and the best reachable host. system_username = await self._get_system_username(user, server) shell_target = server.hostname or server.ipv4 or server.ipv6 if not system_username or not shell_target: @@ -62,6 +64,7 @@ class ShellConsumer(AsyncWebsocketConsumer): self.system_username = system_username self.shell_target = shell_target + # Only accept the socket after all authn/authz checks have passed. await self.accept() # Audit the WebSocket connection as an explicit, opt-in event. await self._audit_websocket_event(user=user, action="connect", metadata={"server_id": server.id}) @@ -96,6 +99,7 @@ class ShellConsumer(AsyncWebsocketConsumer): async def receive(self, text_data=None, bytes_data=None): if not self.proc or not self.proc.stdin: return + # Forward WebSocket payloads directly to the SSH subprocess stdin. if bytes_data is not None: data = bytes_data elif text_data is not None: @@ -109,6 +113,7 @@ class ShellConsumer(AsyncWebsocketConsumer): async def _start_ssh(self, user): # Generate a short-lived keypair + SSH certificate and then # bridge the WebSocket to an SSH subprocess. + # Prefer tmpfs when available so the private key never hits disk. temp_base = "/dev/shm" if os.path.isdir("/dev/shm") and os.access("/dev/shm", os.W_OK) else None self.tempdir = tempfile.TemporaryDirectory(prefix="keywarden-shell-", dir=temp_base) key_path, cert_path = await asyncio.to_thread( @@ -118,6 +123,7 @@ class ShellConsumer(AsyncWebsocketConsumer): self.system_username, ) ssh_host = _format_ssh_host(self.shell_target) + # Use a locked-down, non-interactive SSH invocation suitable for websockets. command = [ "ssh", "-tt", @@ -154,6 +160,7 @@ class ShellConsumer(AsyncWebsocketConsumer): stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, ) + # Delete key material immediately after the SSH process has it open. for path in (key_path, cert_path, f"{key_path}.pub"): try: os.remove(path) @@ -166,6 +173,7 @@ class ShellConsumer(AsyncWebsocketConsumer): async def _stream_output(self): if not self.proc or not self.proc.stdout: return + # Pump subprocess output until EOF, then close the socket. while True: chunk = await self.proc.stdout.read(4096) if not chunk: @@ -228,6 +236,7 @@ class ShellConsumer(AsyncWebsocketConsumer): metadata=combined_metadata, ) except Exception: + # Auditing is best-effort; never fail the shell session. return @@ -255,6 +264,7 @@ def _generate_session_keypair(tempdir: str, user, principal: str) -> tuple[str, raise RuntimeError("ssh-keygen not available") from exc except subprocess.CalledProcessError as exc: raise RuntimeError(f"ssh-keygen failed: {exc.stderr.decode('utf-8', 'ignore')}") from exc + # Restrict filesystem access to the private key. os.chmod(key_path, 0o600) pubkey_path = key_path + ".pub" with open(pubkey_path, "r", encoding="utf-8") as handle: @@ -273,6 +283,7 @@ def _generate_session_keypair(tempdir: str, user, principal: str) -> tuple[str, cert_path = key_path + "-cert.pub" with open(cert_path, "w", encoding="utf-8") as handle: handle.write(cert_text + "\n") + # Public cert is safe to be world-readable. os.chmod(cert_path, 0o644) return key_path, cert_path diff --git a/app/keywarden/settings/base.py b/app/keywarden/settings/base.py index 0554199..4fc2db8 100644 --- a/app/keywarden/settings/base.py +++ b/app/keywarden/settings/base.py @@ -6,6 +6,7 @@ from django.urls import reverse_lazy from django.templatetags.static import static from django.utils.translation import gettext_lazy as _ +# Load environment overrides early so settings can reference them. load_dotenv() BASE_DIR = Path(__file__).resolve().parent.parent.parent @@ -20,6 +21,7 @@ CSRF_TRUSTED_ORIGINS = [ if origin.strip() ] +# Default to secure cookies and respect TLS termination headers. SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https") CSRF_COOKIE_SECURE = True SESSION_COOKIE_SECURE = True @@ -94,6 +96,7 @@ CACHES = { } } +# In-memory channel layer keeps local development simple. CHANNEL_LAYERS = { "default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}, } @@ -101,6 +104,7 @@ CHANNEL_LAYERS = { SESSION_ENGINE = "django.contrib.sessions.backends.cache" SESSION_CACHE_ALIAS = "default" +# Certificate validity defaults; can be tightened via env vars. KEYWARDEN_AGENT_CERT_VALIDITY_DAYS = int(os.getenv("KEYWARDEN_AGENT_CERT_VALIDITY_DAYS", "90")) KEYWARDEN_USER_CERT_VALIDITY_DAYS = int(os.getenv("KEYWARDEN_USER_CERT_VALIDITY_DAYS", "30")) KEYWARDEN_SHELL_CERT_VALIDITY_MINUTES = int(os.getenv("KEYWARDEN_SHELL_CERT_VALIDITY_MINUTES", "15")) @@ -178,6 +182,7 @@ UNFOLD = { "ENVIRONMENT": "Keywarden", "ENVIRONMENT_COLOR": "#7C3AED", "SHOW_VIEW_ON_SITE": True, + # Force a consistent admin theme; disables theme switching. "THEME": "dark", # Force theme: "dark" or "light". Will disable theme switcher "SIDEBAR": { "show_search": True, @@ -250,6 +255,7 @@ if AUTH_MODE not in {"native", "oidc", "hybrid"}: KEYWARDEN_AUTH_MODE = AUTH_MODE if AUTH_MODE == "oidc": + # OIDC-only: enforce identity provider logins. AUTHENTICATION_BACKENDS = [ "django.contrib.auth.backends.ModelBackend", "guardian.backends.ObjectPermissionBackend", @@ -271,4 +277,5 @@ LOGOUT_REDIRECT_URL = "/" ANONYMOUS_USER_NAME = None def permission_callback(request): + # Guard admin-side model changes behind a single permission check. return request.user.has_perm("keywarden.change_model")