Coverage for app_modules/images_service.py: 44%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.4, created at 2025-08-20 00:55 +0200

1""" 

2images_service.py 

3 

4Purpose: 

5 Encapsulate image discovery and grouping logic. Responsible for scanning 

6 `static/User-photos/`, merging DB hints (file_paths), and grouping images for 

7 the gallery views. 

8 

9Exports: 

10 - collect_user_images(user_id, db, static_path) -> list[dict] 

11 - latest_from_db_or_fs(cam_id, file_paths, static_path) -> (datetime|None, rel|None) 

12 - group_images_by_time(items) -> list[section] 

13""" 

14 

15import os 

16from datetime import datetime 

17from .helpers import parse_ts_from_any, build_media_url 

18 

19 

20def collect_user_images(user_id: int, db, static_path: str): 

21 images_by_rel = {} 

22 # Build set of the user's camera ids 

23 cam_rows = db.execute('SELECT camera_id, file_paths FROM cameras WHERE user_id = ?', (user_id,)).fetchall() 

24 user_cam_ids = {str(r['camera_id']) for r in cam_rows} 

25 # Seed from DB file_paths when present 

26 for row in cam_rows: 

27 file_paths = row['file_paths'] or '' 

28 for raw in [p.strip() for p in file_paths.split(',') if p.strip()]: 

29 rel = _normalize_to_static_user_photos(raw) 

30 ts, cam = parse_ts_from_any(rel) 

31 abs_path = os.path.join(static_path, rel) 

32 if os.path.exists(abs_path): 

33 ts = ts or datetime.fromtimestamp(os.path.getmtime(abs_path)) 

34 images_by_rel[rel] = { 

35 'rel': rel, 

36 'url': build_media_url(rel), 

37 'ts': ts, 

38 'camera_id': cam or str(row['camera_id']) 

39 } 

40 # Filesystem scan 

41 root = os.path.join(static_path, 'User-photos') 

42 for dirpath, _, files in os.walk(root): 

43 for name in files: 

44 if not name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): 

45 continue 

46 ts, cam = parse_ts_from_any(name) 

47 if cam and cam in user_cam_ids: 

48 abs_path = os.path.join(dirpath, name) 

49 rel = os.path.relpath(abs_path, static_path).replace('\\', '/') 

50 if rel not in images_by_rel: 

51 ts = ts or datetime.fromtimestamp(os.path.getmtime(abs_path)) 

52 images_by_rel[rel] = { 

53 'rel': rel, 

54 'url': build_media_url(rel), 

55 'ts': ts, 

56 'camera_id': cam 

57 } 

58 images = list(images_by_rel.values()) 

59 images.sort(key=lambda x: x['ts'] or datetime.min, reverse=True) 

60 return images 

61 

62 

63def latest_from_db_or_fs(cam_id: str, file_paths: str, static_path: str): 

64 latest_dt = None 

65 latest_rel = None 

66 # Prefer DB file_paths 

67 if file_paths: 

68 for raw in [p.strip() for p in file_paths.split(',') if p.strip()]: 

69 rel = _normalize_to_static_user_photos(raw) 

70 ts, cam = parse_ts_from_any(rel) 

71 abs_path = os.path.join(static_path, rel) 

72 if not os.path.exists(abs_path): 

73 continue 

74 candidate_dt = ts or datetime.fromtimestamp(os.path.getmtime(abs_path)) 

75 if cam == cam_id and candidate_dt and (latest_dt is None or candidate_dt > latest_dt): 

76 latest_dt = candidate_dt 

77 latest_rel = rel 

78 # Scan filesystem (recursive) 

79 root = os.path.join(static_path, 'User-photos') 

80 for dirpath, _, files in os.walk(root): 

81 for name in files: 

82 if not name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): 

83 continue 

84 ts, cam = parse_ts_from_any(name) 

85 if cam != cam_id: 

86 continue 

87 abs_path = os.path.join(dirpath, name) 

88 candidate_dt = ts or datetime.fromtimestamp(os.path.getmtime(abs_path)) 

89 if candidate_dt and (latest_dt is None or candidate_dt > latest_dt): 

90 latest_dt = candidate_dt 

91 rel = os.path.relpath(abs_path, static_path).replace('\\', '/') 

92 latest_rel = rel 

93 return latest_dt, latest_rel 

94 

95 

96def group_images_by_time(items): 

97 from datetime import datetime 

98 now = datetime.now() 

99 today = now.date() 

100 start_of_month = today.replace(day=1) 

101 sections = { 

102 'Danas': [], 

103 'Jučer': [], 

104 'Zadnjih 7 dana': [], 

105 'Ovaj mjesec': [], 

106 'Starije od mjesec dana': [], 

107 } 

108 for it in items: 

109 ts = it.get('ts') 

110 if not isinstance(ts, datetime): 

111 sections['Starije od mjesec dana'].append(it) 

112 continue 

113 d = ts.date() 

114 delta_days = (today - d).days 

115 if delta_days == 0: 

116 sections['Danas'].append(it) 

117 elif delta_days == 1: 

118 sections['Jučer'].append(it) 

119 elif 1 < delta_days <= 7: 

120 sections['Zadnjih 7 dana'].append(it) 

121 elif d >= start_of_month: 

122 sections['Ovaj mjesec'].append(it) 

123 else: 

124 sections['Starije od mjesec dana'].append(it) 

125 ordered = [] 

126 for key in ['Danas', 'Jučer', 'Zadnjih 7 dana', 'Ovaj mjesec', 'Starije od mjesec dana']: 

127 if sections[key]: 

128 sections[key].sort(key=lambda x: x.get('ts') or datetime.min, reverse=True) 

129 ordered.append({'title': key, 'items': sections[key]}) 

130 return ordered 

131 

132 

133def _normalize_to_static_user_photos(p: str) -> str: 

134 p = p.replace('\\', '/').lstrip('/') 

135 if p.startswith('static/'): 

136 p = p[len('static/') : ] 

137 if 'User-photos/' in p: 

138 p = p[p.find('User-photos/') : ] 

139 elif '/' not in p: 

140 p = f'User-photos/{p}' 

141 return p 

142 

143