Coverage for app_modules/audit.py: 88%
33 statements
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
« prev ^ index » next coverage.py v7.10.4, created at 2025-08-20 00:55 +0200
1"""
2audit.py
4Purpose:
5 Centralized audit logging for authentication events and administrative
6 actions. This module provides small, focused helpers to record security-
7 relevant events into the SQLite database, so operators have traceability for
8 incident response.
10How it works:
11 - Lazily ensures minimal schemas for two tables: `auth_log` and
12 `admin_audit`.
13 - Exposes `log_auth_event(...)` and `log_admin_action(...)` with simple
14 parameters to record events. Timestamps are stored as UNIX epoch seconds.
15 - Keeps the code path lightweight; failures to log never break requests.
17Data model:
18 - auth_log(id, ts, ip, user_id, username, event, detail)
19 - admin_audit(id, ts, ip, admin_user_id, target_user_id, action, detail)
20"""
22from __future__ import annotations
24import time
25from typing import Optional
27from .db import get_db
30def _now() -> int:
31 return int(time.time())
34def _ensure_schema() -> None:
35 db = get_db()
36 db.execute(
37 """
38 CREATE TABLE IF NOT EXISTS auth_log (
39 id INTEGER PRIMARY KEY AUTOINCREMENT,
40 ts INTEGER NOT NULL,
41 ip TEXT,
42 user_id INTEGER,
43 username TEXT,
44 event TEXT NOT NULL,
45 detail TEXT
46 )
47 """
48 )
49 db.execute(
50 """
51 CREATE TABLE IF NOT EXISTS admin_audit (
52 id INTEGER PRIMARY KEY AUTOINCREMENT,
53 ts INTEGER NOT NULL,
54 ip TEXT,
55 admin_user_id INTEGER NOT NULL,
56 target_user_id INTEGER,
57 action TEXT NOT NULL,
58 detail TEXT
59 )
60 """
61 )
62 db.commit()
65def log_auth_event(event: str, *, username: Optional[str] = None, user_id: Optional[int] = None,
66 ip: Optional[str] = None, detail: Optional[str] = None) -> None:
67 try:
68 _ensure_schema()
69 db = get_db()
70 db.execute(
71 'INSERT INTO auth_log (ts, ip, user_id, username, event, detail) VALUES (?, ?, ?, ?, ?, ?)',
72 (_now(), ip, user_id, username, event, detail),
73 )
74 db.commit()
75 except Exception:
76 # Auditing must be best-effort only; never break requests
77 pass
80def log_admin_action(action: str, *, admin_user_id: int, target_user_id: Optional[int] = None,
81 ip: Optional[str] = None, detail: Optional[str] = None) -> None:
82 try:
83 _ensure_schema()
84 db = get_db()
85 db.execute(
86 'INSERT INTO admin_audit (ts, ip, admin_user_id, target_user_id, action, detail) VALUES (?, ?, ?, ?, ?, ?)',
87 (_now(), ip, admin_user_id, target_user_id, action, detail),
88 )
89 db.commit()
90 except Exception:
91 pass
94def get_request_ip(request) -> Optional[str]:
95 try:
96 fwd = (request.headers.get('X-Forwarded-For') or '').split(',')[0].strip()
97 return fwd or request.remote_addr
98 except Exception:
99 return None