Coverage for app_modules/helpers.py: 64%

90 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2helpers.py 

3 

4Purpose: 

5 Miscellaneous utilities reused across routes: 

6 - Timestamp parsing from camera filenames 

7 - Human-readable formatting 

8 - Normalization of paths into the `static/User-photos` namespace 

9 

10Exports: 

11 - parse_ts_from_any(path_or_name) -> (datetime|None, camera_id|None) 

12 - format_dt(datetime|None) -> str 

13 - normalize_to_static_user_photos(path: str) -> str 

14""" 

15 

16import os 

17import re 

18from datetime import datetime 

19import base64 

20import hmac 

21import hashlib 

22from flask import current_app, url_for 

23 

24 

25def parse_ts_from_any(path_or_name: str): 

26 name = os.path.basename(path_or_name) 

27 m = re.match(r"PICT_(\d{4})(\d{2})(\d{2})_(\d{2})(\d{2})(\d{2})_([A-Za-z0-9]{12})", name, re.IGNORECASE) 

28 if not m: 

29 return None, None 

30 year, a, b, hh, mm, ss, cam = m.groups() 

31 # Primary format in data appears to be YYYYDDMM 

32 try: 

33 day = int(a); month = int(b) 

34 ts = datetime(int(year), month, day, int(hh), int(mm), int(ss)) 

35 return ts, cam 

36 except ValueError: 

37 pass 

38 # Fallback try YYYYMMDD 

39 try: 

40 month = int(a); day = int(b) 

41 ts = datetime(int(year), month, day, int(hh), int(mm), int(ss)) 

42 return ts, cam 

43 except ValueError: 

44 return None, cam 

45 

46 

47def format_dt(dt: datetime|None) -> str: 

48 if not dt: 

49 return 'Nema aktivnosti' 

50 return dt.strftime('%d.%m.%Y %H:%M') 

51 

52 

53def normalize_to_static_user_photos(path: str) -> str: 

54 if not path: 

55 return 'User-photos/' 

56 p = path.replace('\\', '/').lstrip('/') 

57 if p.startswith('static/'): 

58 p = p[len('static/') : ] 

59 if 'User-photos/' in p: 

60 p = p[p.find('User-photos/') : ] 

61 elif '/' not in p: 

62 p = f'User-photos/{p}' 

63 return p 

64 

65 

66# -------- Secure media token helpers -------- 

67 

68def _b64url_encode(data: bytes) -> str: 

69 return base64.urlsafe_b64encode(data).decode('utf-8').rstrip('=') 

70 

71 

72def _b64url_decode(data: str) -> bytes: 

73 padding = '=' * (-len(data) % 4) 

74 return base64.urlsafe_b64decode(data + padding) 

75 

76 

77def generate_media_token(rel_path: str) -> str: 

78 """Create a deterministic signed token for a relative path under User-photos. 

79 Token format: base64url(rel) + '.' + base64url(HMAC_SHA256(secret, rel)) 

80 No expiry (stable), access still requires auth + ownership in the serving route. 

81 """ 

82 rel = normalize_to_static_user_photos(rel_path) 

83 secret = (current_app.config.get('SECRET_KEY') or '').encode('utf-8') 

84 mac = hmac.new(secret, rel.encode('utf-8'), hashlib.sha256).digest() 

85 return f"{_b64url_encode(rel.encode('utf-8'))}.{_b64url_encode(mac)}" 

86 

87 

88def build_media_url(rel_path: str) -> str: 

89 token = generate_media_token(rel_path) 

90 return url_for('media_routes.media_get', token=token) 

91 

92 

93def resolve_media_token(token: str) -> str | None: 

94 """Verify a media token and return normalized relative path, or None if invalid.""" 

95 try: 

96 b64_rel, b64_mac = token.split('.', 1) 

97 rel = _b64url_decode(b64_rel).decode('utf-8') 

98 rel = normalize_to_static_user_photos(rel) 

99 secret = (current_app.config.get('SECRET_KEY') or '').encode('utf-8') 

100 expected = hmac.new(secret, rel.encode('utf-8'), hashlib.sha256).digest() 

101 provided = _b64url_decode(b64_mac) 

102 if not hmac.compare_digest(expected, provided): 

103 return None 

104 return rel 

105 except Exception: 

106 return None 

107 

108 

109# -------- Expiring public share tokens -------- 

110 

111def generate_share_token(rel_path: str, expires_at_epoch: int) -> str: 

112 """Create an expiring token that can be used publicly without a session. 

113 Format: base64url(rel) + "." + str(exp) + "." + base64url(HMAC(secret, rel|exp|public)) 

114 """ 

115 rel = normalize_to_static_user_photos(rel_path) 

116 secret = (current_app.config.get('SECRET_KEY') or '').encode('utf-8') 

117 msg = f"{rel}|{int(expires_at_epoch)}|public".encode('utf-8') 

118 mac = hmac.new(secret, msg, hashlib.sha256).digest() 

119 return f"{_b64url_encode(rel.encode('utf-8'))}.{int(expires_at_epoch)}.{_b64url_encode(mac)}" 

120 

121 

122def build_share_url(rel_path: str, expires_at_epoch: int) -> str: 

123 token = generate_share_token(rel_path, expires_at_epoch) 

124 return url_for('media_routes.media_get', token=token, _external=True) 

125 

126 

127def resolve_share_token(token: str) -> tuple[str, int] | None: 

128 """Verify a share token and return (rel, exp) or None if invalid.""" 

129 try: 

130 b64_rel, exp_s, b64_mac = token.split('.', 2) 

131 rel = _b64url_decode(b64_rel).decode('utf-8') 

132 rel = normalize_to_static_user_photos(rel) 

133 exp = int(exp_s) 

134 secret = (current_app.config.get('SECRET_KEY') or '').encode('utf-8') 

135 msg = f"{rel}|{exp}|public".encode('utf-8') 

136 expected = hmac.new(secret, msg, hashlib.sha256).digest() 

137 provided = _b64url_decode(b64_mac) 

138 if not hmac.compare_digest(expected, provided): 

139 return None 

140 return rel, exp 

141 except Exception: 

142 return None 

143 

144