Coverage for app_modules/audit.py: 88%

33 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2audit.py 

3 

4Purpose: 

5 Centralized audit logging for authentication events and administrative 

6 actions. This module provides small, focused helpers to record security- 

7 relevant events into the SQLite database, so operators have traceability for 

8 incident response. 

9 

10How it works: 

11 - Lazily ensures minimal schemas for two tables: `auth_log` and 

12 `admin_audit`. 

13 - Exposes `log_auth_event(...)` and `log_admin_action(...)` with simple 

14 parameters to record events. Timestamps are stored as UNIX epoch seconds. 

15 - Keeps the code path lightweight; failures to log never break requests. 

16 

17Data model: 

18 - auth_log(id, ts, ip, user_id, username, event, detail) 

19 - admin_audit(id, ts, ip, admin_user_id, target_user_id, action, detail) 

20""" 

21 

22from __future__ import annotations 

23 

24import time 

25from typing import Optional 

26 

27from .db import get_db 

28 

29 

30def _now() -> int: 

31 return int(time.time()) 

32 

33 

34def _ensure_schema() -> None: 

35 db = get_db() 

36 db.execute( 

37 """ 

38 CREATE TABLE IF NOT EXISTS auth_log ( 

39 id INTEGER PRIMARY KEY AUTOINCREMENT, 

40 ts INTEGER NOT NULL, 

41 ip TEXT, 

42 user_id INTEGER, 

43 username TEXT, 

44 event TEXT NOT NULL, 

45 detail TEXT 

46 ) 

47 """ 

48 ) 

49 db.execute( 

50 """ 

51 CREATE TABLE IF NOT EXISTS admin_audit ( 

52 id INTEGER PRIMARY KEY AUTOINCREMENT, 

53 ts INTEGER NOT NULL, 

54 ip TEXT, 

55 admin_user_id INTEGER NOT NULL, 

56 target_user_id INTEGER, 

57 action TEXT NOT NULL, 

58 detail TEXT 

59 ) 

60 """ 

61 ) 

62 db.commit() 

63 

64 

65def log_auth_event(event: str, *, username: Optional[str] = None, user_id: Optional[int] = None, 

66 ip: Optional[str] = None, detail: Optional[str] = None) -> None: 

67 try: 

68 _ensure_schema() 

69 db = get_db() 

70 db.execute( 

71 'INSERT INTO auth_log (ts, ip, user_id, username, event, detail) VALUES (?, ?, ?, ?, ?, ?)', 

72 (_now(), ip, user_id, username, event, detail), 

73 ) 

74 db.commit() 

75 except Exception: 

76 # Auditing must be best-effort only; never break requests 

77 pass 

78 

79 

80def log_admin_action(action: str, *, admin_user_id: int, target_user_id: Optional[int] = None, 

81 ip: Optional[str] = None, detail: Optional[str] = None) -> None: 

82 try: 

83 _ensure_schema() 

84 db = get_db() 

85 db.execute( 

86 'INSERT INTO admin_audit (ts, ip, admin_user_id, target_user_id, action, detail) VALUES (?, ?, ?, ?, ?, ?)', 

87 (_now(), ip, admin_user_id, target_user_id, action, detail), 

88 ) 

89 db.commit() 

90 except Exception: 

91 pass 

92 

93 

94def get_request_ip(request) -> Optional[str]: 

95 try: 

96 fwd = (request.headers.get('X-Forwarded-For') or '').split(',')[0].strip() 

97 return fwd or request.remote_addr 

98 except Exception: 

99 return None 

100 

101